Compare commits
9 Commits
c851e82990
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6e1c32f22c | ||
|
|
f470d6d958 | ||
|
|
28f4c63e99 | ||
|
|
dd4c6b95b7 | ||
|
|
b658053711 | ||
|
|
20c1d738be | ||
|
|
ecfa7cea39 | ||
|
|
ef18915807 | ||
|
|
69976196cd |
30
TASKS.md
30
TASKS.md
@@ -1,6 +1,32 @@
|
||||
# derp - Tasks
|
||||
|
||||
## Current Sprint -- MusicBrainz Fallback (2026-02-23)
|
||||
## Current Sprint -- Discovery Playlists (2026-02-23)
|
||||
|
||||
| Pri | Status | Task |
|
||||
|-----|--------|------|
|
||||
| P0 | [x] | `!similar` default: discover + resolve + play (playlist mode) |
|
||||
| P0 | [x] | `!similar list` subcommand for display-only (old default) |
|
||||
| P0 | [x] | `_search_queries()` normalizes Last.fm/MB results to search strings |
|
||||
| P0 | [x] | `_resolve_playlist()` parallel yt-dlp resolution via ThreadPoolExecutor |
|
||||
| P1 | [x] | Playback transition: fade out, clear queue, load playlist, fade in |
|
||||
| P1 | [x] | Fallback to display when music plugin not loaded |
|
||||
| P1 | [x] | Tests: 11 new cases (81 total in test_lastfm.py, 1949 suite total) |
|
||||
| P2 | [x] | Documentation update (USAGE.md, CHEATSHEET.md, TASKS.md) |
|
||||
|
||||
## Previous Sprint -- Enhanced Help with FlaskPaste (2026-02-23)
|
||||
|
||||
| Pri | Status | Task |
|
||||
|-----|--------|------|
|
||||
| P0 | [x] | `!help <cmd>` pastes docstring detail via FlaskPaste, appends URL |
|
||||
| P0 | [x] | `!help <plugin>` pastes all plugin command details |
|
||||
| P0 | [x] | `!help` (no args) pastes full reference grouped by plugin |
|
||||
| P1 | [x] | 3-level hierarchy: plugin (col 0), command (indent 4), docstring (indent 8) |
|
||||
| P1 | [x] | Graceful fallback when FlaskPaste not loaded or paste fails |
|
||||
| P1 | [x] | Helper functions: `_build_cmd_detail(indent=)`, `_paste` |
|
||||
| P1 | [x] | Tests: 7 new cases in test_core.py (11 total) |
|
||||
| P2 | [x] | Documentation update (USAGE.md, CHEATSHEET.md, TASKS.md) |
|
||||
|
||||
## Previous Sprint -- MusicBrainz Fallback (2026-02-23)
|
||||
|
||||
| Pri | Status | Task |
|
||||
|-----|--------|------|
|
||||
@@ -306,6 +332,8 @@
|
||||
|
||||
| Date | Task |
|
||||
|------|------|
|
||||
| 2026-02-23 | `!similar` discovery playlists (parallel resolve, fade transition, list subcommand) |
|
||||
| 2026-02-23 | Enhanced `!help` with FlaskPaste detail pages (docstrings, grouped reference) |
|
||||
| 2026-02-23 | MusicBrainz fallback for `!similar` and `!tags` (no Last.fm key required) |
|
||||
| 2026-02-22 | v2.3.0 (voice profiles, rubberband FX, multi-bot, self-mute, container tools) |
|
||||
| 2026-02-21 | v2.3.0 (pymumble rewrite, music playback, fades, seek, kept library) |
|
||||
|
||||
@@ -86,15 +86,20 @@ Profile data written on graceful shutdown when bot runs with `--cprofile`.
|
||||
|
||||
```
|
||||
!ping # Pong
|
||||
!help # List commands
|
||||
!help <cmd> # Command help
|
||||
!help <plugin> # Plugin description + commands
|
||||
!help # List commands + paste full reference
|
||||
!help <cmd> # Command help + paste docstring detail
|
||||
!help <plugin> # Plugin info + paste command details
|
||||
!version # Bot version
|
||||
!uptime # Bot uptime
|
||||
!echo <text> # Echo text back
|
||||
!h # Shorthand (any unambiguous prefix works)
|
||||
```
|
||||
|
||||
Detailed help is pasted to FlaskPaste and appended as a URL. Paste
|
||||
layout uses a 3-level hierarchy: `[plugin]` at column 0, `!command`
|
||||
at indent 4, docstring body at indent 8. Falls back gracefully if
|
||||
FlaskPaste is not loaded.
|
||||
|
||||
## Permission Tiers
|
||||
|
||||
```
|
||||
@@ -608,14 +613,17 @@ Mumble-only: `!play` replies with error on other adapters, others silently no-op
|
||||
## Music Discovery
|
||||
|
||||
```
|
||||
!similar # Similar to currently playing track
|
||||
!similar <artist> # Similar artists to named artist
|
||||
!similar play # Queue a random similar track
|
||||
!similar play <artist># Queue similar track for named artist
|
||||
!similar # Discover + play similar to current track
|
||||
!similar <artist> # Discover + play similar to named artist
|
||||
!similar list # Show similar (display only)
|
||||
!similar list <artist># Show similar for named artist
|
||||
!tags # Genre tags for current artist
|
||||
!tags <artist> # Genre tags for named artist
|
||||
```
|
||||
|
||||
Default `!similar` builds a playlist: discovers similar artists, resolves
|
||||
via YouTube in parallel, fades out current, plays the new playlist.
|
||||
`!similar list` shows results without playing.
|
||||
Uses Last.fm when API key is set; falls back to MusicBrainz automatically.
|
||||
Config: `[lastfm] api_key` or `LASTFM_API_KEY` env var.
|
||||
|
||||
|
||||
@@ -124,9 +124,9 @@ unchanged. The server name is derived from the hostname automatically.
|
||||
| Command | Description |
|
||||
|---------|-------------|
|
||||
| `!ping` | Bot responds with "pong" |
|
||||
| `!help` | List all available commands |
|
||||
| `!help <cmd>` | Show help for a specific command |
|
||||
| `!help <plugin>` | Show plugin description and its commands |
|
||||
| `!help` | List all commands + paste full reference |
|
||||
| `!help <cmd>` | Show help + paste detailed docstring |
|
||||
| `!help <plugin>` | Show plugin description + paste command details |
|
||||
| `!version` | Show bot version |
|
||||
| `!uptime` | Show how long the bot has been running |
|
||||
| `!echo <text>` | Echo back text (example plugin) |
|
||||
@@ -206,6 +206,30 @@ unchanged. The server name is derived from the hostname automatically.
|
||||
| `!cron <add\|del\|list>` | Scheduled command execution (admin) |
|
||||
| `!webhook` | Show webhook listener status (admin) |
|
||||
|
||||
### Detailed Help (FlaskPaste)
|
||||
|
||||
`!help` pastes detailed reference output to FlaskPaste and appends the
|
||||
URL. The paste uses a 3-level indentation hierarchy:
|
||||
|
||||
```
|
||||
[plugin-name]
|
||||
Plugin description.
|
||||
|
||||
!command -- short help
|
||||
Full docstring with usage, subcommands,
|
||||
and examples.
|
||||
|
||||
!other -- another command
|
||||
Its docstring here.
|
||||
```
|
||||
|
||||
- `!help` (no args) -- pastes the full reference grouped by plugin
|
||||
- `!help <cmd>` -- pastes the command's docstring (command at column 0)
|
||||
- `!help <plugin>` -- pastes all commands under the plugin header
|
||||
|
||||
If FlaskPaste is not loaded or the paste fails, the short IRC reply
|
||||
still works -- no regression.
|
||||
|
||||
### Command Shorthand
|
||||
|
||||
Commands can be abbreviated to any unambiguous prefix:
|
||||
@@ -1767,19 +1791,22 @@ key is configured; falls back to MusicBrainz automatically (no key
|
||||
required).
|
||||
|
||||
```
|
||||
!similar Similar to currently playing track
|
||||
!similar <artist> Similar artists to named artist
|
||||
!similar play Queue a random similar track
|
||||
!similar play <artist> Queue a similar track for named artist
|
||||
!similar Discover + play similar to current track
|
||||
!similar <artist> Discover + play similar to named artist
|
||||
!similar list Show similar (display only)
|
||||
!similar list <artist> Show similar for named artist
|
||||
!tags Genre tags for currently playing artist
|
||||
!tags <artist> Genre tags for named artist
|
||||
```
|
||||
|
||||
- Default `!similar` builds a discovery playlist: finds similar artists/tracks,
|
||||
resolves each against YouTube in parallel, fades out current playback, and
|
||||
starts the new playlist
|
||||
- `!similar list` shows results without playing (old default behavior)
|
||||
- When an API key is set, Last.fm is tried first for richer results
|
||||
- When no API key is set (or Last.fm returns empty), MusicBrainz is
|
||||
used as a fallback (artist search -> tags -> similar recordings)
|
||||
- `!similar play` picks a random result and delegates to `!play`
|
||||
(searches YouTube for the artist + title)
|
||||
- Without the music plugin loaded, `!similar` falls back to display mode
|
||||
- MusicBrainz rate limit: 1 request/second (handled automatically)
|
||||
|
||||
Configuration (optional):
|
||||
|
||||
@@ -1,11 +1,42 @@
|
||||
"""Core plugin: ping, help, version, plugin management."""
|
||||
|
||||
import asyncio
|
||||
import textwrap
|
||||
from collections import Counter
|
||||
|
||||
from derp import __version__
|
||||
from derp.plugin import command
|
||||
|
||||
|
||||
def _build_cmd_detail(handler, prefix: str, indent: int = 0) -> str:
|
||||
"""Format command header + docstring at the given indent level.
|
||||
|
||||
Command name sits at *indent*, docstring body at *indent + 4*.
|
||||
Returns just the header line when no docstring exists.
|
||||
"""
|
||||
pad = " " * indent
|
||||
header = f"{pad}{prefix}{handler.name}"
|
||||
if handler.help:
|
||||
header += f" -- {handler.help}"
|
||||
doc = textwrap.dedent(handler.callback.__doc__ or "").strip()
|
||||
if not doc:
|
||||
return header
|
||||
indented = textwrap.indent(doc, " " * (indent + 4))
|
||||
return f"{header}\n{indented}"
|
||||
|
||||
|
||||
async def _paste(bot, text: str) -> str | None:
|
||||
"""Create a paste via FlaskPaste. Returns URL or None."""
|
||||
fp = bot.registry._modules.get("flaskpaste")
|
||||
if not fp:
|
||||
return None
|
||||
loop = asyncio.get_running_loop()
|
||||
try:
|
||||
return await loop.run_in_executor(None, fp.create_paste, bot, text)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
@command("ping", help="Check if the bot is alive")
|
||||
async def cmd_ping(bot, message):
|
||||
"""Respond with pong."""
|
||||
@@ -27,7 +58,13 @@ async def cmd_help(bot, message):
|
||||
handler = bot.registry.commands.get(name)
|
||||
if handler and bot._plugin_allowed(handler.plugin, channel):
|
||||
help_text = handler.help or "No help available."
|
||||
await bot.reply(message, f"{bot.prefix}{name} -- {help_text}")
|
||||
reply = f"{bot.prefix}{name} -- {help_text}"
|
||||
if (handler.callback.__doc__ or "").strip():
|
||||
detail = _build_cmd_detail(handler, bot.prefix)
|
||||
url = await _paste(bot, detail)
|
||||
if url:
|
||||
reply += f" | {url}"
|
||||
await bot.reply(message, reply)
|
||||
return
|
||||
|
||||
# Check plugin
|
||||
@@ -41,7 +78,24 @@ async def cmd_help(bot, message):
|
||||
lines = [f"{name} -- {desc}" if desc else name]
|
||||
if cmds:
|
||||
lines.append(f"Commands: {', '.join(bot.prefix + c for c in cmds)}")
|
||||
await bot.reply(message, " | ".join(lines))
|
||||
reply = " | ".join(lines)
|
||||
# Build detail: plugin header + indented commands
|
||||
section_lines = [f"[{name}]"]
|
||||
if desc:
|
||||
section_lines.append(f" {desc}")
|
||||
section_lines.append("")
|
||||
has_detail = False
|
||||
for cmd_name in cmds:
|
||||
h = bot.registry.commands[cmd_name]
|
||||
section_lines.append(_build_cmd_detail(h, bot.prefix, indent=4))
|
||||
section_lines.append("")
|
||||
if (h.callback.__doc__ or "").strip():
|
||||
has_detail = True
|
||||
if has_detail:
|
||||
url = await _paste(bot, "\n".join(section_lines).rstrip())
|
||||
if url:
|
||||
reply += f" | {url}"
|
||||
await bot.reply(message, reply)
|
||||
return
|
||||
|
||||
await bot.reply(message, f"Unknown command or plugin: {name}")
|
||||
@@ -52,7 +106,31 @@ async def cmd_help(bot, message):
|
||||
k for k, v in bot.registry.commands.items()
|
||||
if bot._plugin_allowed(v.plugin, channel)
|
||||
)
|
||||
await bot.reply(message, ", ".join(names))
|
||||
reply = ", ".join(names)
|
||||
|
||||
# Build full reference grouped by plugin
|
||||
plugins: dict[str, list[str]] = {}
|
||||
for cmd_name in names:
|
||||
h = bot.registry.commands[cmd_name]
|
||||
plugins.setdefault(h.plugin, []).append(cmd_name)
|
||||
sections = []
|
||||
for plugin_name in sorted(plugins):
|
||||
mod = bot.registry._modules.get(plugin_name)
|
||||
desc = (getattr(mod, "__doc__", "") or "").split("\n")[0].strip() if mod else ""
|
||||
section_lines = [f"[{plugin_name}]"]
|
||||
if desc:
|
||||
section_lines.append(f" {desc}")
|
||||
section_lines.append("")
|
||||
for cmd_name in plugins[plugin_name]:
|
||||
h = bot.registry.commands[cmd_name]
|
||||
section_lines.append(_build_cmd_detail(h, bot.prefix, indent=4))
|
||||
section_lines.append("")
|
||||
sections.append("\n".join(section_lines).rstrip())
|
||||
if sections:
|
||||
url = await _paste(bot, "\n\n".join(sections))
|
||||
if url:
|
||||
reply += f" | {url}"
|
||||
await bot.reply(message, reply)
|
||||
|
||||
|
||||
@command("version", help="Show bot version")
|
||||
|
||||
@@ -105,26 +105,41 @@ def _parse_title(raw_title: str) -> tuple[str, str]:
|
||||
return ("", raw_title)
|
||||
|
||||
|
||||
def _current_meta(bot) -> tuple[str, str]:
|
||||
"""Extract artist and title from the currently playing track.
|
||||
def _music_bot(bot):
|
||||
"""Return the bot instance that owns music playback.
|
||||
|
||||
Returns (artist, title). Either or both may be empty.
|
||||
Tries the music plugin's current track metadata on this bot first,
|
||||
then checks peer bots (shared registry) so extra bots can see what
|
||||
the music bot is playing.
|
||||
Checks the calling bot first, then peer bots via the shared registry.
|
||||
Returns the first bot with an active music state, or ``bot`` as fallback.
|
||||
"""
|
||||
# Check this bot first, then peers
|
||||
candidates = [bot]
|
||||
for peer in getattr(getattr(bot, "registry", None), "_bots", {}).values():
|
||||
if peer is not bot:
|
||||
candidates.append(peer)
|
||||
for b in candidates:
|
||||
music_ps = getattr(b, "_pstate", {}).get("music", {})
|
||||
current = music_ps.get("current")
|
||||
if current is not None:
|
||||
raw_title = current.title or ""
|
||||
if raw_title:
|
||||
return _parse_title(raw_title)
|
||||
if music_ps.get("current") is not None or music_ps.get("queue"):
|
||||
return b
|
||||
# No active music state -- prefer a bot that allows the music plugin
|
||||
for b in candidates:
|
||||
only = getattr(b, "_only_plugins", None)
|
||||
if only is not None and "music" in only:
|
||||
return b
|
||||
return bot
|
||||
|
||||
|
||||
def _current_meta(bot) -> tuple[str, str]:
|
||||
"""Extract artist and title from the currently playing track.
|
||||
|
||||
Returns (artist, title). Either or both may be empty.
|
||||
Checks the music bot (via ``_music_bot``) for now-playing metadata.
|
||||
"""
|
||||
mb = _music_bot(bot)
|
||||
music_ps = getattr(mb, "_pstate", {}).get("music", {})
|
||||
current = music_ps.get("current")
|
||||
if current is not None:
|
||||
raw_title = current.title or ""
|
||||
if raw_title:
|
||||
return _parse_title(raw_title)
|
||||
return ("", "")
|
||||
|
||||
|
||||
@@ -192,30 +207,141 @@ def _fmt_match(m: float | str) -> str:
|
||||
return ""
|
||||
|
||||
|
||||
# -- Playlist helpers --------------------------------------------------------
|
||||
|
||||
|
||||
def _search_queries(similar: list[dict], similar_artists: list[dict],
|
||||
mb_results: list[dict], limit: int = 10) -> list[str]:
|
||||
"""Normalize discovery results into YouTube search strings.
|
||||
|
||||
Processes track results (``{artist: {name}, name}``), artist results
|
||||
(``{name}``), and MusicBrainz results (``{artist, title}``) into a
|
||||
flat list of search query strings, up to *limit*.
|
||||
"""
|
||||
queries: list[str] = []
|
||||
for t in similar:
|
||||
a = t.get("artist", {}).get("name", "")
|
||||
n = t.get("name", "")
|
||||
q = f"{a} {n}".strip()
|
||||
if q:
|
||||
queries.append(q)
|
||||
for a in similar_artists:
|
||||
name = a.get("name", "")
|
||||
if name:
|
||||
queries.append(name)
|
||||
for r in mb_results:
|
||||
q = f"{r.get('artist', '')} {r.get('title', '')}".strip()
|
||||
if q:
|
||||
queries.append(q)
|
||||
return queries[:limit]
|
||||
|
||||
|
||||
async def _resolve_playlist(bot, queries: list[str],
|
||||
requester: str) -> list:
|
||||
"""Resolve search queries to Track objects via yt-dlp in parallel.
|
||||
|
||||
Uses the music plugin's ``_resolve_tracks`` and ``_Track`` to build
|
||||
a playlist. Returns a list of ``_Track`` objects (empty on failure).
|
||||
"""
|
||||
music_mod = bot.registry._modules.get("music")
|
||||
if not music_mod:
|
||||
return []
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
resolve = music_mod._resolve_tracks
|
||||
Track = music_mod._Track
|
||||
|
||||
pool = _get_yt_pool()
|
||||
|
||||
async def _resolve_one(query: str):
|
||||
try:
|
||||
pairs = await loop.run_in_executor(
|
||||
pool, resolve, f"ytsearch1:{query}", 1,
|
||||
)
|
||||
if pairs:
|
||||
url, title = pairs[0]
|
||||
return Track(url=url, title=title, requester=requester)
|
||||
except Exception:
|
||||
log.debug("lastfm: resolve failed for %r", query)
|
||||
return None
|
||||
|
||||
tasks = [_resolve_one(q) for q in queries]
|
||||
results = await asyncio.gather(*tasks)
|
||||
return [t for t in results if t is not None]
|
||||
|
||||
|
||||
_yt_pool = None
|
||||
|
||||
|
||||
def _get_yt_pool():
|
||||
"""Lazy-init a shared ThreadPoolExecutor for yt-dlp resolution."""
|
||||
global _yt_pool
|
||||
if _yt_pool is None:
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
_yt_pool = ThreadPoolExecutor(max_workers=4)
|
||||
return _yt_pool
|
||||
|
||||
|
||||
async def _display_results(bot, message, similar: list[dict],
|
||||
similar_artists: list[dict],
|
||||
mb_results: list[dict],
|
||||
artist: str, title: str) -> None:
|
||||
"""Format and display discovery results (list mode)."""
|
||||
if similar:
|
||||
lines = [f"Similar to {artist} - {title}:"]
|
||||
for t in similar[:8]:
|
||||
t_artist = t.get("artist", {}).get("name", "")
|
||||
t_name = t.get("name", "?")
|
||||
match = _fmt_match(t.get("match", ""))
|
||||
suffix = f" ({match})" if match else ""
|
||||
lines.append(f" {t_artist} - {t_name}{suffix}")
|
||||
await bot.long_reply(message, lines, label="similar tracks")
|
||||
return
|
||||
|
||||
search_artist = artist or title
|
||||
if similar_artists:
|
||||
lines = [f"Similar to {search_artist}:"]
|
||||
for a in similar_artists[:8]:
|
||||
name = a.get("name", "?")
|
||||
match = _fmt_match(a.get("match", ""))
|
||||
suffix = f" ({match})" if match else ""
|
||||
lines.append(f" {name}{suffix}")
|
||||
await bot.long_reply(message, lines, label="similar artists")
|
||||
return
|
||||
|
||||
if mb_results:
|
||||
lines = [f"Similar to {search_artist}:"]
|
||||
for r in mb_results[:8]:
|
||||
lines.append(f" {r['artist']} - {r['title']}")
|
||||
await bot.long_reply(message, lines, label="similar tracks")
|
||||
return
|
||||
|
||||
await bot.reply(message, f"No similar artists found for '{search_artist}'")
|
||||
|
||||
|
||||
# -- Commands ----------------------------------------------------------------
|
||||
|
||||
|
||||
@command("similar", help="Music: !similar [artist|play] -- find similar music")
|
||||
@command("similar", help="Music: !similar [list] [artist] -- discover & play similar music")
|
||||
async def cmd_similar(bot, message):
|
||||
"""Find similar artists or tracks.
|
||||
"""Discover and play similar music.
|
||||
|
||||
Usage:
|
||||
!similar Similar to currently playing track
|
||||
!similar <artist> Similar artists to named artist
|
||||
!similar play Queue a random similar track
|
||||
!similar play <artist> Queue a similar track for named artist
|
||||
!similar Discover + play similar to current track
|
||||
!similar <artist> Discover + play similar to named artist
|
||||
!similar list Show similar (display only)
|
||||
!similar list <artist> Show similar for named artist
|
||||
"""
|
||||
api_key = _get_api_key(bot)
|
||||
|
||||
parts = message.text.split(None, 2)
|
||||
# !similar play [artist]
|
||||
play_mode = len(parts) >= 2 and parts[1].lower() == "play"
|
||||
if play_mode:
|
||||
# !similar list [artist]
|
||||
list_mode = len(parts) >= 2 and parts[1].lower() == "list"
|
||||
if list_mode:
|
||||
query = parts[2].strip() if len(parts) > 2 else ""
|
||||
else:
|
||||
query = parts[1].strip() if len(parts) > 1 else ""
|
||||
|
||||
import asyncio
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
# Resolve artist from query or current track
|
||||
@@ -229,8 +355,8 @@ async def cmd_similar(bot, message):
|
||||
return
|
||||
|
||||
# -- Last.fm path --
|
||||
similar = []
|
||||
similar_artists = []
|
||||
similar: list[dict] = []
|
||||
similar_artists: list[dict] = []
|
||||
if api_key:
|
||||
# Try track-level similarity first if we have both artist + title
|
||||
if artist and title:
|
||||
@@ -245,7 +371,7 @@ async def cmd_similar(bot, message):
|
||||
)
|
||||
|
||||
# -- MusicBrainz fallback --
|
||||
mb_results = []
|
||||
mb_results: list[dict] = []
|
||||
if not similar and not similar_artists:
|
||||
search_artist = artist or title
|
||||
try:
|
||||
@@ -267,77 +393,47 @@ async def cmd_similar(bot, message):
|
||||
except Exception:
|
||||
log.warning("lastfm: MusicBrainz fallback failed", exc_info=True)
|
||||
|
||||
# -- Track-level results (Last.fm) --
|
||||
if similar:
|
||||
if play_mode:
|
||||
pick = random.choice(similar[:10])
|
||||
pick_artist = pick.get("artist", {}).get("name", "")
|
||||
pick_title = pick.get("name", "")
|
||||
search = f"{pick_artist} {pick_title}".strip()
|
||||
if not search:
|
||||
await bot.reply(message, "No playable result found")
|
||||
return
|
||||
message.text = f"!play {search}"
|
||||
music_mod = bot.registry._modules.get("music")
|
||||
if music_mod:
|
||||
await music_mod.cmd_play(bot, message)
|
||||
return
|
||||
|
||||
lines = [f"Similar to {artist} - {title}:"]
|
||||
for t in similar[:8]:
|
||||
t_artist = t.get("artist", {}).get("name", "")
|
||||
t_name = t.get("name", "?")
|
||||
match = _fmt_match(t.get("match", ""))
|
||||
suffix = f" ({match})" if match else ""
|
||||
lines.append(f" {t_artist} - {t_name}{suffix}")
|
||||
await bot.long_reply(message, lines, label="similar tracks")
|
||||
return
|
||||
|
||||
# -- Artist-level results (Last.fm) --
|
||||
if similar_artists:
|
||||
# Nothing found at all
|
||||
if not similar and not similar_artists and not mb_results:
|
||||
search_artist = artist or title
|
||||
if play_mode:
|
||||
pick = random.choice(similar_artists[:10])
|
||||
pick_name = pick.get("name", "")
|
||||
if not pick_name:
|
||||
await bot.reply(message, "No playable result found")
|
||||
return
|
||||
message.text = f"!play {pick_name}"
|
||||
music_mod = bot.registry._modules.get("music")
|
||||
if music_mod:
|
||||
await music_mod.cmd_play(bot, message)
|
||||
return
|
||||
|
||||
lines = [f"Similar to {search_artist}:"]
|
||||
for a in similar_artists[:8]:
|
||||
name = a.get("name", "?")
|
||||
match = _fmt_match(a.get("match", ""))
|
||||
suffix = f" ({match})" if match else ""
|
||||
lines.append(f" {name}{suffix}")
|
||||
await bot.long_reply(message, lines, label="similar artists")
|
||||
await bot.reply(message, f"No similar artists found for '{search_artist}'")
|
||||
return
|
||||
|
||||
# -- MusicBrainz results --
|
||||
if mb_results:
|
||||
search_artist = artist or title
|
||||
if play_mode:
|
||||
pick = random.choice(mb_results[:10])
|
||||
search = f"{pick['artist']} {pick['title']}".strip()
|
||||
message.text = f"!play {search}"
|
||||
music_mod = bot.registry._modules.get("music")
|
||||
if music_mod:
|
||||
await music_mod.cmd_play(bot, message)
|
||||
return
|
||||
|
||||
lines = [f"Similar to {search_artist}:"]
|
||||
for r in mb_results[:8]:
|
||||
lines.append(f" {r['artist']} - {r['title']}")
|
||||
await bot.long_reply(message, lines, label="similar tracks")
|
||||
# -- List mode (display only) --
|
||||
if list_mode:
|
||||
await _display_results(bot, message, similar, similar_artists,
|
||||
mb_results, artist, title)
|
||||
return
|
||||
|
||||
# Nothing found
|
||||
# -- Play mode (default): build playlist and transition --
|
||||
search_artist = artist or title
|
||||
await bot.reply(message, f"No similar artists found for '{search_artist}'")
|
||||
queries = _search_queries(similar, similar_artists, mb_results, limit=10)
|
||||
if not queries:
|
||||
await bot.reply(message, f"No similar artists found for '{search_artist}'")
|
||||
return
|
||||
|
||||
music_mod = bot.registry._modules.get("music")
|
||||
if not music_mod:
|
||||
# No music plugin -- fall back to display
|
||||
await _display_results(bot, message, similar, similar_artists,
|
||||
mb_results, artist, title)
|
||||
return
|
||||
|
||||
await bot.reply(message, f"Discovering similar to {search_artist}...")
|
||||
tracks = await _resolve_playlist(bot, queries, message.nick)
|
||||
if not tracks:
|
||||
await bot.reply(message, "No playable tracks resolved")
|
||||
return
|
||||
|
||||
# Transition on the music bot (derp), not the calling bot (may be merlin)
|
||||
dj = _music_bot(bot)
|
||||
ps = music_mod._ps(dj)
|
||||
await music_mod._fade_and_cancel(dj, duration=3.0)
|
||||
ps["queue"].clear()
|
||||
ps["current"] = None
|
||||
ps["queue"] = list(tracks)
|
||||
music_mod._ensure_loop(dj, fade_in=True)
|
||||
await bot.reply(message, f"Playing {len(tracks)} similar tracks for {search_artist}")
|
||||
|
||||
|
||||
@command("tags", help="Music: !tags [artist] -- show genre tags")
|
||||
@@ -353,7 +449,6 @@ async def cmd_tags(bot, message):
|
||||
parts = message.text.split(None, 1)
|
||||
query = parts[1].strip() if len(parts) > 1 else ""
|
||||
|
||||
import asyncio
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
if query:
|
||||
|
||||
@@ -22,6 +22,7 @@ where = ["src"]
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
testpaths = ["tests"]
|
||||
pythonpath = ["."]
|
||||
|
||||
[tool.ruff]
|
||||
line-length = 99
|
||||
|
||||
@@ -3,6 +3,8 @@
|
||||
import asyncio
|
||||
import importlib.util
|
||||
import sys
|
||||
import types
|
||||
from dataclasses import dataclass
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
# -- Load plugin module directly ---------------------------------------------
|
||||
@@ -16,9 +18,22 @@ _spec.loader.exec_module(_mod)
|
||||
# -- Fakes -------------------------------------------------------------------
|
||||
|
||||
|
||||
@dataclass
|
||||
class _FakeHandler:
|
||||
name: str
|
||||
callback: object
|
||||
help: str = ""
|
||||
plugin: str = ""
|
||||
admin: bool = False
|
||||
tier: str = "user"
|
||||
|
||||
|
||||
class _FakeRegistry:
|
||||
def __init__(self):
|
||||
self._bots: dict = {}
|
||||
self.commands: dict = {}
|
||||
self._modules: dict = {}
|
||||
self.events: dict = {}
|
||||
|
||||
|
||||
class _FakeBot:
|
||||
@@ -26,10 +41,14 @@ class _FakeBot:
|
||||
self.replied: list[str] = []
|
||||
self.registry = _FakeRegistry()
|
||||
self.nick = "derp"
|
||||
self.prefix = "!"
|
||||
self._receive_sound = False
|
||||
if mumble:
|
||||
self._mumble = MagicMock()
|
||||
|
||||
def _plugin_allowed(self, plugin: str, channel) -> bool:
|
||||
return True
|
||||
|
||||
async def reply(self, message, text: str) -> None:
|
||||
self.replied.append(text)
|
||||
|
||||
@@ -90,3 +109,180 @@ class TestDeafCommand:
|
||||
msg = _Msg(text="!deaf")
|
||||
asyncio.run(_mod.cmd_deaf(bot, msg))
|
||||
bot._mumble.users.myself.deafen.assert_called_once()
|
||||
|
||||
|
||||
# -- Help command tests ------------------------------------------------------
|
||||
|
||||
|
||||
def _cmd_with_doc():
|
||||
"""Manage widgets.
|
||||
|
||||
Usage:
|
||||
!widget add <name>
|
||||
!widget del <name>
|
||||
|
||||
Examples:
|
||||
!widget add foo
|
||||
"""
|
||||
|
||||
|
||||
def _cmd_no_doc():
|
||||
pass
|
||||
|
||||
|
||||
def _make_fp_module(url="https://paste.example.com/abc/raw", capture=None):
|
||||
"""Create a fake flaskpaste module that returns a fixed URL.
|
||||
|
||||
If capture is a list, appended paste content is stored there.
|
||||
"""
|
||||
mod = types.ModuleType("flaskpaste")
|
||||
|
||||
def _create(bot, text):
|
||||
if capture is not None:
|
||||
capture.append(text)
|
||||
return url
|
||||
|
||||
mod.create_paste = _create
|
||||
return mod
|
||||
|
||||
|
||||
class TestHelpCommand:
|
||||
def test_help_cmd_with_paste(self):
|
||||
"""!help <cmd> with docstring pastes detail, appends URL."""
|
||||
bot = _FakeBot()
|
||||
handler = _FakeHandler(
|
||||
name="widget", callback=_cmd_with_doc,
|
||||
help="Manage widgets", plugin="widgets",
|
||||
)
|
||||
bot.registry.commands["widget"] = handler
|
||||
bot.registry._modules["flaskpaste"] = _make_fp_module()
|
||||
msg = _Msg(text="!help widget")
|
||||
asyncio.run(_mod.cmd_help(bot, msg))
|
||||
assert len(bot.replied) == 1
|
||||
assert "!widget -- Manage widgets" in bot.replied[0]
|
||||
assert "https://paste.example.com/abc/raw" in bot.replied[0]
|
||||
|
||||
def test_help_cmd_no_docstring(self):
|
||||
"""!help <cmd> without docstring skips paste."""
|
||||
bot = _FakeBot()
|
||||
handler = _FakeHandler(
|
||||
name="noop", callback=_cmd_no_doc,
|
||||
help="Does nothing", plugin="misc",
|
||||
)
|
||||
bot.registry.commands["noop"] = handler
|
||||
bot.registry._modules["flaskpaste"] = _make_fp_module()
|
||||
msg = _Msg(text="!help noop")
|
||||
asyncio.run(_mod.cmd_help(bot, msg))
|
||||
assert len(bot.replied) == 1
|
||||
assert "!noop -- Does nothing" in bot.replied[0]
|
||||
assert "paste.example.com" not in bot.replied[0]
|
||||
|
||||
def test_help_plugin_with_paste(self):
|
||||
"""!help <plugin> pastes detail for all plugin commands."""
|
||||
bot = _FakeBot()
|
||||
mod = types.ModuleType("widgets")
|
||||
mod.__doc__ = "Widget management plugin."
|
||||
bot.registry._modules["widgets"] = mod
|
||||
bot.registry._modules["flaskpaste"] = _make_fp_module()
|
||||
bot.registry.commands["widget"] = _FakeHandler(
|
||||
name="widget", callback=_cmd_with_doc,
|
||||
help="Manage widgets", plugin="widgets",
|
||||
)
|
||||
bot.registry.commands["wstat"] = _FakeHandler(
|
||||
name="wstat", callback=_cmd_no_doc,
|
||||
help="Widget stats", plugin="widgets",
|
||||
)
|
||||
msg = _Msg(text="!help widgets")
|
||||
asyncio.run(_mod.cmd_help(bot, msg))
|
||||
assert len(bot.replied) == 1
|
||||
reply = bot.replied[0]
|
||||
assert "widgets -- Widget management plugin." in reply
|
||||
assert "!widget, !wstat" in reply
|
||||
# Only widget has a docstring, so paste should still happen
|
||||
assert "https://paste.example.com/abc/raw" in reply
|
||||
|
||||
def test_help_list_with_paste(self):
|
||||
"""!help (no args) pastes full reference."""
|
||||
bot = _FakeBot()
|
||||
bot.registry._modules["flaskpaste"] = _make_fp_module()
|
||||
mod = types.ModuleType("core")
|
||||
mod.__doc__ = "Core plugin."
|
||||
bot.registry._modules["core"] = mod
|
||||
bot.registry.commands["ping"] = _FakeHandler(
|
||||
name="ping", callback=_cmd_with_doc,
|
||||
help="Check alive", plugin="core",
|
||||
)
|
||||
bot.registry.commands["help"] = _FakeHandler(
|
||||
name="help", callback=_cmd_no_doc,
|
||||
help="Show help", plugin="core",
|
||||
)
|
||||
msg = _Msg(text="!help")
|
||||
asyncio.run(_mod.cmd_help(bot, msg))
|
||||
assert len(bot.replied) == 1
|
||||
assert "help, ping" in bot.replied[0]
|
||||
assert "https://paste.example.com/abc/raw" in bot.replied[0]
|
||||
|
||||
def test_help_no_flaskpaste(self):
|
||||
"""Without flaskpaste loaded, help still works (no URL)."""
|
||||
bot = _FakeBot()
|
||||
handler = _FakeHandler(
|
||||
name="widget", callback=_cmd_with_doc,
|
||||
help="Manage widgets", plugin="widgets",
|
||||
)
|
||||
bot.registry.commands["widget"] = handler
|
||||
# No flaskpaste in _modules
|
||||
msg = _Msg(text="!help widget")
|
||||
asyncio.run(_mod.cmd_help(bot, msg))
|
||||
assert len(bot.replied) == 1
|
||||
assert "!widget -- Manage widgets" in bot.replied[0]
|
||||
assert "https://" not in bot.replied[0]
|
||||
|
||||
def test_help_cmd_paste_hierarchy(self):
|
||||
"""Single-command paste: header at 0, docstring at 4."""
|
||||
bot = _FakeBot()
|
||||
pastes: list[str] = []
|
||||
bot.registry._modules["flaskpaste"] = _make_fp_module(capture=pastes)
|
||||
bot.registry.commands["widget"] = _FakeHandler(
|
||||
name="widget", callback=_cmd_with_doc,
|
||||
help="Manage widgets", plugin="widgets",
|
||||
)
|
||||
msg = _Msg(text="!help widget")
|
||||
asyncio.run(_mod.cmd_help(bot, msg))
|
||||
assert len(pastes) == 1
|
||||
lines = pastes[0].split("\n")
|
||||
# Level 0: command header flush-left
|
||||
assert lines[0] == "!widget -- Manage widgets"
|
||||
# Level 1: docstring lines indented 4 spaces
|
||||
for line in lines[1:]:
|
||||
if line.strip():
|
||||
assert line.startswith(" "), f"not indented: {line!r}"
|
||||
|
||||
def test_help_list_paste_hierarchy(self):
|
||||
"""Full reference paste: plugin at 0, command at 4, doc at 8."""
|
||||
bot = _FakeBot()
|
||||
pastes: list[str] = []
|
||||
bot.registry._modules["flaskpaste"] = _make_fp_module(capture=pastes)
|
||||
mod = types.ModuleType("core")
|
||||
mod.__doc__ = "Core plugin."
|
||||
bot.registry._modules["core"] = mod
|
||||
bot.registry.commands["state"] = _FakeHandler(
|
||||
name="state", callback=_cmd_with_doc,
|
||||
help="Inspect state", plugin="core",
|
||||
)
|
||||
msg = _Msg(text="!help")
|
||||
asyncio.run(_mod.cmd_help(bot, msg))
|
||||
assert len(pastes) == 1
|
||||
text = pastes[0]
|
||||
lines = text.split("\n")
|
||||
# Level 0: plugin header
|
||||
assert lines[0] == "[core]"
|
||||
# Level 1: plugin description
|
||||
assert lines[1] == " Core plugin."
|
||||
# Blank separator
|
||||
assert lines[2] == ""
|
||||
# Level 1: command header at indent 4
|
||||
assert lines[3] == " !state -- Inspect state"
|
||||
# Level 2: docstring at indent 8
|
||||
for line in lines[4:]:
|
||||
if line.strip():
|
||||
assert line.startswith(" "), f"not at indent 8: {line!r}"
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
|
||||
import asyncio
|
||||
import importlib.util
|
||||
import json
|
||||
import sys
|
||||
from dataclasses import dataclass
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
@@ -21,14 +20,17 @@ _spec.loader.exec_module(_mod)
|
||||
class _FakeRegistry:
|
||||
def __init__(self):
|
||||
self._modules: dict = {}
|
||||
self._bots: dict = {}
|
||||
|
||||
|
||||
class _FakeBot:
|
||||
def __init__(self, *, api_key: str = "test-key"):
|
||||
def __init__(self, *, api_key: str = "test-key", name: str = "derp"):
|
||||
self.replied: list[str] = []
|
||||
self.config: dict = {"lastfm": {"api_key": api_key}} if api_key else {}
|
||||
self._pstate: dict = {}
|
||||
self._only_plugins: set[str] | None = None
|
||||
self.registry = _FakeRegistry()
|
||||
self._username = name
|
||||
|
||||
async def reply(self, message, text: str) -> None:
|
||||
self.replied.append(text)
|
||||
@@ -363,11 +365,54 @@ class TestFmtMatch:
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestSearchQueries:
|
||||
def test_track_results(self):
|
||||
similar = [
|
||||
{"name": "Track X", "artist": {"name": "Band A"}, "match": "0.9"},
|
||||
{"name": "Track Y", "artist": {"name": "Band B"}, "match": "0.7"},
|
||||
]
|
||||
result = _mod._search_queries(similar, [], [])
|
||||
assert result == ["Band A Track X", "Band B Track Y"]
|
||||
|
||||
def test_artist_results(self):
|
||||
artists = [{"name": "Deftones"}, {"name": "APC"}]
|
||||
result = _mod._search_queries([], artists, [])
|
||||
assert result == ["Deftones", "APC"]
|
||||
|
||||
def test_mb_results(self):
|
||||
mb = [{"artist": "MB Band", "title": "MB Song"}]
|
||||
result = _mod._search_queries([], [], mb)
|
||||
assert result == ["MB Band MB Song"]
|
||||
|
||||
def test_mixed_sources(self):
|
||||
"""Track results come first, then artist, then MB."""
|
||||
similar = [{"name": "T1", "artist": {"name": "A1"}}]
|
||||
artists = [{"name": "A2"}]
|
||||
mb = [{"artist": "MB", "title": "S1"}]
|
||||
result = _mod._search_queries(similar, artists, mb)
|
||||
assert result == ["A1 T1", "A2", "MB S1"]
|
||||
|
||||
def test_limit(self):
|
||||
artists = [{"name": f"Band {i}"} for i in range(20)]
|
||||
result = _mod._search_queries([], artists, [], limit=5)
|
||||
assert len(result) == 5
|
||||
|
||||
def test_skips_empty(self):
|
||||
similar = [{"name": "", "artist": {"name": ""}}]
|
||||
artists = [{"name": ""}]
|
||||
mb = [{"artist": "", "title": ""}]
|
||||
result = _mod._search_queries(similar, artists, mb)
|
||||
assert result == []
|
||||
|
||||
def test_empty_inputs(self):
|
||||
assert _mod._search_queries([], [], []) == []
|
||||
|
||||
|
||||
class TestCmdSimilar:
|
||||
def test_no_api_key_mb_fallback(self):
|
||||
"""No API key falls back to MusicBrainz for similar results."""
|
||||
def test_no_api_key_mb_list_fallback(self):
|
||||
"""No API key + list mode falls back to MusicBrainz for results."""
|
||||
bot = _FakeBot(api_key="")
|
||||
msg = _Msg(text="!similar Tool")
|
||||
msg = _Msg(text="!similar list Tool")
|
||||
mb_picks = [{"artist": "MB Artist", "title": "MB Song"}]
|
||||
with patch.dict("os.environ", {}, clear=True), \
|
||||
patch("plugins._musicbrainz.mb_search_artist",
|
||||
@@ -390,40 +435,16 @@ class TestCmdSimilar:
|
||||
asyncio.run(_mod.cmd_similar(bot, msg))
|
||||
assert any("No similar artists" in r for r in bot.replied)
|
||||
|
||||
def test_no_api_key_play_mode(self):
|
||||
"""No API key + play mode delegates to cmd_play via MB results."""
|
||||
bot = _FakeBot(api_key="")
|
||||
msg = _Msg(text="!similar play Tool")
|
||||
mb_picks = [{"artist": "MB Band", "title": "MB Track"}]
|
||||
play_called = []
|
||||
|
||||
async def fake_play(b, m):
|
||||
play_called.append(m.text)
|
||||
|
||||
music_mod = MagicMock()
|
||||
music_mod.cmd_play = fake_play
|
||||
bot.registry._modules["music"] = music_mod
|
||||
|
||||
with patch.dict("os.environ", {}, clear=True), \
|
||||
patch("plugins._musicbrainz.mb_search_artist",
|
||||
return_value="mbid-123"), \
|
||||
patch("plugins._musicbrainz.mb_artist_tags",
|
||||
return_value=["rock"]), \
|
||||
patch("plugins._musicbrainz.mb_find_similar_recordings",
|
||||
return_value=mb_picks):
|
||||
asyncio.run(_mod.cmd_similar(bot, msg))
|
||||
assert len(play_called) == 1
|
||||
assert "MB Band" in play_called[0]
|
||||
|
||||
def test_no_artist_nothing_playing(self):
|
||||
bot = _FakeBot()
|
||||
msg = _Msg(text="!similar")
|
||||
asyncio.run(_mod.cmd_similar(bot, msg))
|
||||
assert any("Nothing playing" in r for r in bot.replied)
|
||||
|
||||
def test_artist_query_shows_similar(self):
|
||||
def test_list_artist_shows_similar(self):
|
||||
"""!similar list <artist> shows similar artists (display only)."""
|
||||
bot = _FakeBot()
|
||||
msg = _Msg(text="!similar Tool")
|
||||
msg = _Msg(text="!similar list Tool")
|
||||
with patch.object(_mod, "_get_similar_tracks", return_value=[]):
|
||||
with patch.object(_mod, "_get_similar_artists",
|
||||
return_value=SIMILAR_ARTISTS_RESP["similarartists"]["artist"]):
|
||||
@@ -431,26 +452,26 @@ class TestCmdSimilar:
|
||||
assert any("Similar to Tool" in r for r in bot.replied)
|
||||
assert any("Artist B" in r for r in bot.replied)
|
||||
|
||||
def test_track_level_similarity(self):
|
||||
"""When current track has artist + title, tries track similarity first."""
|
||||
def test_list_track_level(self):
|
||||
"""!similar list with track results shows track similarity."""
|
||||
bot = _FakeBot()
|
||||
bot._pstate["music"] = {
|
||||
"current": _FakeTrack(title="Tool - Lateralus"),
|
||||
}
|
||||
msg = _Msg(text="!similar")
|
||||
msg = _Msg(text="!similar list")
|
||||
tracks = SIMILAR_TRACKS_RESP["similartracks"]["track"]
|
||||
with patch.object(_mod, "_get_similar_tracks", return_value=tracks):
|
||||
asyncio.run(_mod.cmd_similar(bot, msg))
|
||||
assert any("Similar to Tool - Lateralus" in r for r in bot.replied)
|
||||
assert any("Track X" in r for r in bot.replied)
|
||||
|
||||
def test_falls_back_to_artist(self):
|
||||
"""Falls back to artist similarity when no track results."""
|
||||
def test_list_falls_back_to_artist(self):
|
||||
"""!similar list falls back to artist similarity when no track results."""
|
||||
bot = _FakeBot()
|
||||
bot._pstate["music"] = {
|
||||
"current": _FakeTrack(title="Tool - Lateralus"),
|
||||
}
|
||||
msg = _Msg(text="!similar")
|
||||
msg = _Msg(text="!similar list")
|
||||
artists = SIMILAR_ARTISTS_RESP["similarartists"]["artist"]
|
||||
with patch.object(_mod, "_get_similar_tracks", return_value=[]):
|
||||
with patch.object(_mod, "_get_similar_artists", return_value=artists):
|
||||
@@ -465,71 +486,178 @@ class TestCmdSimilar:
|
||||
asyncio.run(_mod.cmd_similar(bot, msg))
|
||||
assert any("No similar artists" in r for r in bot.replied)
|
||||
|
||||
def test_play_mode_artist(self):
|
||||
"""!similar play delegates to music cmd_play."""
|
||||
def test_list_match_score_displayed(self):
|
||||
bot = _FakeBot()
|
||||
msg = _Msg(text="!similar play Tool")
|
||||
artists = [{"name": "Deftones", "match": "0.8"}]
|
||||
play_called = []
|
||||
|
||||
async def fake_play(b, m):
|
||||
play_called.append(m.text)
|
||||
|
||||
music_mod = MagicMock()
|
||||
music_mod.cmd_play = fake_play
|
||||
bot.registry._modules["music"] = music_mod
|
||||
|
||||
with patch.object(_mod, "_get_similar_tracks", return_value=[]):
|
||||
with patch.object(_mod, "_get_similar_artists", return_value=artists):
|
||||
asyncio.run(_mod.cmd_similar(bot, msg))
|
||||
assert len(play_called) == 1
|
||||
assert "Deftones" in play_called[0]
|
||||
|
||||
def test_play_mode_track(self):
|
||||
"""!similar play with track-level results delegates to cmd_play."""
|
||||
bot = _FakeBot()
|
||||
bot._pstate["music"] = {
|
||||
"current": _FakeTrack(title="Tool - Lateralus"),
|
||||
}
|
||||
msg = _Msg(text="!similar play")
|
||||
tracks = [{"name": "Schism", "artist": {"name": "Tool"}, "match": "0.9"}]
|
||||
play_called = []
|
||||
|
||||
async def fake_play(b, m):
|
||||
play_called.append(m.text)
|
||||
|
||||
music_mod = MagicMock()
|
||||
music_mod.cmd_play = fake_play
|
||||
bot.registry._modules["music"] = music_mod
|
||||
|
||||
with patch.object(_mod, "_get_similar_tracks", return_value=tracks):
|
||||
asyncio.run(_mod.cmd_similar(bot, msg))
|
||||
assert len(play_called) == 1
|
||||
assert "Tool" in play_called[0]
|
||||
assert "Schism" in play_called[0]
|
||||
|
||||
def test_match_score_displayed(self):
|
||||
bot = _FakeBot()
|
||||
msg = _Msg(text="!similar Tool")
|
||||
msg = _Msg(text="!similar list Tool")
|
||||
artists = [{"name": "Deftones", "match": "0.85"}]
|
||||
with patch.object(_mod, "_get_similar_tracks", return_value=[]):
|
||||
with patch.object(_mod, "_get_similar_artists", return_value=artists):
|
||||
asyncio.run(_mod.cmd_similar(bot, msg))
|
||||
assert any("85%" in r for r in bot.replied)
|
||||
|
||||
def test_current_track_no_separator(self):
|
||||
def test_list_current_track_no_separator(self):
|
||||
"""Title without separator uses whole title as search artist."""
|
||||
bot = _FakeBot()
|
||||
bot._pstate["music"] = {
|
||||
"current": _FakeTrack(title="Lateralus"),
|
||||
}
|
||||
msg = _Msg(text="!similar")
|
||||
msg = _Msg(text="!similar list")
|
||||
artists = [{"name": "APC", "match": "0.7"}]
|
||||
with patch.object(_mod, "_get_similar_tracks", return_value=[]):
|
||||
with patch.object(_mod, "_get_similar_artists", return_value=artists):
|
||||
asyncio.run(_mod.cmd_similar(bot, msg))
|
||||
assert any("Similar to Lateralus" in r for r in bot.replied)
|
||||
|
||||
def test_builds_playlist(self):
|
||||
"""Default !similar builds playlist and starts playback."""
|
||||
bot = _FakeBot()
|
||||
msg = _Msg(text="!similar Tool")
|
||||
artists = [{"name": "Deftones", "match": "0.8"}]
|
||||
fake_tracks = [_FakeTrack(url="http://yt/1", title="Song 1")]
|
||||
|
||||
music_mod = MagicMock()
|
||||
music_mod._ps.return_value = {
|
||||
"queue": [], "current": None, "task": None,
|
||||
}
|
||||
music_mod._fade_and_cancel = AsyncMock()
|
||||
music_mod._ensure_loop = MagicMock()
|
||||
bot.registry._modules["music"] = music_mod
|
||||
|
||||
with patch.object(_mod, "_get_similar_tracks", return_value=[]), \
|
||||
patch.object(_mod, "_get_similar_artists", return_value=artists), \
|
||||
patch.object(_mod, "_resolve_playlist",
|
||||
return_value=fake_tracks):
|
||||
asyncio.run(_mod.cmd_similar(bot, msg))
|
||||
|
||||
music_mod._fade_and_cancel.assert_called_once()
|
||||
music_mod._ensure_loop.assert_called_once()
|
||||
ps = music_mod._ps(bot)
|
||||
assert ps["queue"] == fake_tracks
|
||||
assert any("Playing 1 similar" in r for r in bot.replied)
|
||||
|
||||
def test_builds_playlist_from_current_track(self):
|
||||
"""!similar with no args discovers from currently playing track."""
|
||||
bot = _FakeBot()
|
||||
bot._pstate["music"] = {
|
||||
"current": _FakeTrack(title="Tool - Lateralus"),
|
||||
}
|
||||
msg = _Msg(text="!similar")
|
||||
tracks = SIMILAR_TRACKS_RESP["similartracks"]["track"]
|
||||
fake_tracks = [_FakeTrack(url="http://yt/1", title="Song 1")]
|
||||
|
||||
music_mod = MagicMock()
|
||||
music_mod._ps.return_value = {
|
||||
"queue": [], "current": None, "task": None,
|
||||
}
|
||||
music_mod._fade_and_cancel = AsyncMock()
|
||||
music_mod._ensure_loop = MagicMock()
|
||||
bot.registry._modules["music"] = music_mod
|
||||
|
||||
with patch.object(_mod, "_get_similar_tracks", return_value=tracks), \
|
||||
patch.object(_mod, "_resolve_playlist",
|
||||
return_value=fake_tracks):
|
||||
asyncio.run(_mod.cmd_similar(bot, msg))
|
||||
|
||||
assert any("Playing 1 similar" in r for r in bot.replied)
|
||||
assert any("Tool" in r for r in bot.replied)
|
||||
|
||||
def test_no_music_mod_falls_back_to_display(self):
|
||||
"""Without music plugin, !similar falls back to display mode."""
|
||||
bot = _FakeBot()
|
||||
msg = _Msg(text="!similar Tool")
|
||||
artists = [{"name": "Deftones", "match": "0.8"}]
|
||||
with patch.object(_mod, "_get_similar_tracks", return_value=[]):
|
||||
with patch.object(_mod, "_get_similar_artists", return_value=artists):
|
||||
asyncio.run(_mod.cmd_similar(bot, msg))
|
||||
# Falls back to display since no music module registered
|
||||
assert any("Similar to Tool" in r for r in bot.replied)
|
||||
assert any("Deftones" in r for r in bot.replied)
|
||||
|
||||
def test_no_playable_tracks_resolved(self):
|
||||
"""Shows error when resolution returns empty."""
|
||||
bot = _FakeBot()
|
||||
msg = _Msg(text="!similar Tool")
|
||||
artists = [{"name": "Deftones", "match": "0.8"}]
|
||||
|
||||
music_mod = MagicMock()
|
||||
music_mod._ps.return_value = {"queue": [], "current": None}
|
||||
bot.registry._modules["music"] = music_mod
|
||||
|
||||
with patch.object(_mod, "_get_similar_tracks", return_value=[]), \
|
||||
patch.object(_mod, "_get_similar_artists", return_value=artists), \
|
||||
patch.object(_mod, "_resolve_playlist",
|
||||
return_value=[]):
|
||||
asyncio.run(_mod.cmd_similar(bot, msg))
|
||||
assert any("No playable tracks" in r for r in bot.replied)
|
||||
|
||||
def test_mb_builds_playlist(self):
|
||||
"""MB fallback results build playlist in play mode."""
|
||||
bot = _FakeBot(api_key="")
|
||||
msg = _Msg(text="!similar Tool")
|
||||
mb_picks = [{"artist": "MB Band", "title": "MB Track"}]
|
||||
fake_tracks = [_FakeTrack(url="http://yt/1", title="MB Track")]
|
||||
|
||||
music_mod = MagicMock()
|
||||
music_mod._ps.return_value = {
|
||||
"queue": [], "current": None, "task": None,
|
||||
}
|
||||
music_mod._fade_and_cancel = AsyncMock()
|
||||
music_mod._ensure_loop = MagicMock()
|
||||
bot.registry._modules["music"] = music_mod
|
||||
|
||||
with patch.dict("os.environ", {}, clear=True), \
|
||||
patch("plugins._musicbrainz.mb_search_artist",
|
||||
return_value="mbid-123"), \
|
||||
patch("plugins._musicbrainz.mb_artist_tags",
|
||||
return_value=["rock"]), \
|
||||
patch("plugins._musicbrainz.mb_find_similar_recordings",
|
||||
return_value=mb_picks), \
|
||||
patch.object(_mod, "_resolve_playlist",
|
||||
return_value=fake_tracks):
|
||||
asyncio.run(_mod.cmd_similar(bot, msg))
|
||||
assert any("Playing 1 similar" in r for r in bot.replied)
|
||||
|
||||
def test_cross_bot_delegates_to_music_bot(self):
|
||||
"""When merlin runs !similar, playback starts on derp (music bot)."""
|
||||
# derp = music bot (has active music state)
|
||||
derp = _FakeBot(api_key="test-key", name="derp")
|
||||
derp._only_plugins = {"music", "voice"}
|
||||
derp._pstate["music"] = {
|
||||
"current": _FakeTrack(title="Tool - Lateralus"),
|
||||
"queue": [],
|
||||
}
|
||||
# merlin = calling bot (no music plugin)
|
||||
merlin = _FakeBot(api_key="test-key", name="merlin")
|
||||
shared_reg = _FakeRegistry()
|
||||
shared_reg._bots = {"derp": derp, "merlin": merlin}
|
||||
derp.registry = shared_reg
|
||||
merlin.registry = shared_reg
|
||||
|
||||
artists = [{"name": "Deftones", "match": "0.8"}]
|
||||
fake_tracks = [_FakeTrack(url="http://yt/1", title="Song 1")]
|
||||
|
||||
music_mod = MagicMock()
|
||||
music_mod._ps.return_value = {
|
||||
"queue": [], "current": None, "task": None,
|
||||
}
|
||||
music_mod._fade_and_cancel = AsyncMock()
|
||||
music_mod._ensure_loop = MagicMock()
|
||||
shared_reg._modules["music"] = music_mod
|
||||
|
||||
msg = _Msg(text="!similar Tool")
|
||||
with patch.object(_mod, "_get_similar_tracks", return_value=[]), \
|
||||
patch.object(_mod, "_get_similar_artists", return_value=artists), \
|
||||
patch.object(_mod, "_resolve_playlist",
|
||||
return_value=fake_tracks):
|
||||
asyncio.run(_mod.cmd_similar(merlin, msg))
|
||||
|
||||
# Music ops must target derp, not merlin
|
||||
music_mod._fade_and_cancel.assert_called_once_with(derp, duration=3.0)
|
||||
music_mod._ensure_loop.assert_called_once_with(derp, fade_in=True)
|
||||
music_mod._ps.assert_called_with(derp)
|
||||
# Reply still goes through merlin (the bot the user talked to)
|
||||
assert any("Playing 1 similar" in r for r in merlin.replied)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestCmdTags
|
||||
@@ -609,6 +737,46 @@ class TestCmdTags:
|
||||
assert any("Lateralus:" in r for r in bot.replied)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestMusicBot
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestMusicBot:
|
||||
def test_returns_self_when_active(self):
|
||||
"""Returns calling bot when it has active music state."""
|
||||
bot = _FakeBot()
|
||||
bot._pstate["music"] = {"current": _FakeTrack(title="X"), "queue": []}
|
||||
assert _mod._music_bot(bot) is bot
|
||||
|
||||
def test_returns_peer_with_active_state(self):
|
||||
"""Returns peer bot that has music playing."""
|
||||
derp = _FakeBot(name="derp")
|
||||
derp._pstate["music"] = {"current": _FakeTrack(title="X"), "queue": []}
|
||||
merlin = _FakeBot(name="merlin")
|
||||
reg = _FakeRegistry()
|
||||
reg._bots = {"derp": derp, "merlin": merlin}
|
||||
derp.registry = reg
|
||||
merlin.registry = reg
|
||||
assert _mod._music_bot(merlin) is derp
|
||||
|
||||
def test_falls_back_to_only_plugins(self):
|
||||
"""Returns bot with only_plugins containing 'music' when no active state."""
|
||||
derp = _FakeBot(name="derp")
|
||||
derp._only_plugins = {"music", "voice"}
|
||||
merlin = _FakeBot(name="merlin")
|
||||
reg = _FakeRegistry()
|
||||
reg._bots = {"derp": derp, "merlin": merlin}
|
||||
derp.registry = reg
|
||||
merlin.registry = reg
|
||||
assert _mod._music_bot(merlin) is derp
|
||||
|
||||
def test_returns_self_as_last_resort(self):
|
||||
"""Returns calling bot when no peer has music state or filters."""
|
||||
bot = _FakeBot()
|
||||
assert _mod._music_bot(bot) is bot
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestParseTitle
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@@ -4,7 +4,6 @@ import importlib.util
|
||||
import json
|
||||
import sys
|
||||
import time
|
||||
from io import BytesIO
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
# -- Load module directly ----------------------------------------------------
|
||||
@@ -305,6 +304,7 @@ class TestMbFindSimilarRecordings:
|
||||
"Tool", ["rock", "metal", "prog"],
|
||||
)
|
||||
call_args = mock_req.call_args
|
||||
query = call_args[1]["query"] if "query" in (call_args[1] or {}) else call_args[0][1].get("query", "")
|
||||
args = call_args[1] or {}
|
||||
query = args.get("query") or call_args[0][1].get("query", "")
|
||||
# Verify the query contains both tag references
|
||||
assert "rock" in query or "metal" in query
|
||||
|
||||
Reference in New Issue
Block a user