Compare commits
16 Commits
5d0e200fbe
...
135a3791e2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
135a3791e2 | ||
|
|
a87f75adf1 | ||
|
|
da9ed51c74 | ||
|
|
56f6b9822f | ||
|
|
09880624d5 | ||
|
|
3c475107e3 | ||
|
|
b3006b02e2 | ||
|
|
8b504364a9 | ||
|
|
40c6bf8c53 | ||
|
|
a76d46b1de | ||
|
|
0ffddb8e41 | ||
|
|
62b01c76f7 | ||
|
|
e0db0ad567 | ||
|
|
c41035ceca | ||
|
|
cd4124e07a | ||
|
|
717bf59a05 |
10
TASKS.md
10
TASKS.md
@@ -9,8 +9,10 @@
|
||||
| P0 | [x] | Instant packet-based ducking via pymumble sound callback (~20ms) |
|
||||
| P0 | [x] | Duck floor raised to 2% (keep music audible during voice) |
|
||||
| P0 | [x] | Strip leading punctuation from voice trigger remainder |
|
||||
| P1 | [ ] | Queue display improvements (`!queue` shows position, duration, total time) |
|
||||
| P1 | [ ] | Playlist save/load (`!playlist save <name>`, `!playlist load <name>`) |
|
||||
| P0 | [x] | Fix greeting tests: move greet TTS to voice plugin `on_connected` |
|
||||
| P0 | [x] | Whisper `initial_prompt` bias for trigger word recognition |
|
||||
| P1 | [x] | Queue display improvements (`!queue` shows elapsed/duration, totals) |
|
||||
| P1 | [x] | Playlist save/load/list/del (`!playlist save <name>`, etc.) |
|
||||
| P2 | [ ] | Per-channel voice settings (different voice per Mumble channel) |
|
||||
|
||||
## Previous Sprint -- Performance: HTTP + Parsing (2026-02-22)
|
||||
@@ -21,7 +23,7 @@
|
||||
| P0 | [x] | `plugins/searx.py` -- route through `derp.http.urlopen(proxy=False)` |
|
||||
| P1 | [x] | Connection pool: `preload_content=True` + `_PooledResponse` wrapper for connection reuse |
|
||||
| P1 | [x] | Pool tuning: `num_pools=30, maxsize=8` (was 20/4) |
|
||||
| P2 | [ ] | Audit remaining plugins for unnecessary proxy routing |
|
||||
| P2 | [x] | Audit remaining plugins for unnecessary proxy routing |
|
||||
|
||||
## Previous Sprint -- Music Discovery via Last.fm (2026-02-22)
|
||||
|
||||
@@ -32,7 +34,7 @@
|
||||
| P0 | [x] | `!similar play` -- queue a similar track via YouTube search |
|
||||
| P1 | [x] | `!tags` command -- show genre/style tags for current or named track |
|
||||
| P1 | [x] | Config: `[lastfm] api_key` or `LASTFM_API_KEY` env var |
|
||||
| P2 | [ ] | Tests: `test_lastfm.py` (API response mocking, command dispatch) |
|
||||
| P2 | [x] | Tests: `test_lastfm.py` (50 cases: API helpers, metadata, commands) |
|
||||
| P2 | [ ] | Documentation update (USAGE.md, CHEATSHEET.md) |
|
||||
|
||||
## Previous Sprint -- v2.3.0 Mumble Voice + Multi-Bot (2026-02-22)
|
||||
|
||||
@@ -18,4 +18,4 @@ services:
|
||||
- ./secrets:/app/secrets:ro,Z
|
||||
environment:
|
||||
- OPENROUTER_API_KEY
|
||||
command: ["--verbose", "--cprofile"]
|
||||
command: ["--verbose"]
|
||||
|
||||
@@ -574,7 +574,7 @@ HTML stripped on receive, escaped on send. IRC-only commands are no-ops.
|
||||
!prev # Go back to previous track (fades out)
|
||||
!seek 1:30 # Seek to position (also +30, -30)
|
||||
!resume # Resume last stopped/skipped track
|
||||
!queue # Show queue
|
||||
!queue # Show queue (with durations + totals)
|
||||
!queue <url> # Add to queue (alias for !play)
|
||||
!np # Now playing
|
||||
!volume # Show current volume
|
||||
@@ -587,9 +587,13 @@ HTML stripped on receive, escaped on send. IRC-only commands are no-ops.
|
||||
!duck # Show ducking status
|
||||
!duck on # Enable voice ducking
|
||||
!duck off # Disable voice ducking
|
||||
!duck floor 5 # Set duck floor volume (0-100, default 1)
|
||||
!duck floor 5 # Set duck floor volume (0-100, default 2)
|
||||
!duck silence 20 # Set silence timeout seconds (default 15)
|
||||
!duck restore 45 # Set restore ramp duration seconds (default 30)
|
||||
!playlist save <name> # Save current + queued tracks as named playlist
|
||||
!playlist load <name> # Append saved playlist to queue, start if idle
|
||||
!playlist list # Show saved playlists with track counts
|
||||
!playlist del <name> # Delete a saved playlist
|
||||
```
|
||||
|
||||
Requires: `yt-dlp`, `ffmpeg`, `libopus` on the host.
|
||||
|
||||
@@ -1623,13 +1623,17 @@ and voice transmission.
|
||||
!prev Go back to the previous track (fade-out)
|
||||
!seek <offset> Seek to position (1:30, 90, +30, -30)
|
||||
!resume Resume last stopped/skipped track from saved position
|
||||
!queue Show queue
|
||||
!queue Show queue (with durations + totals)
|
||||
!queue <url> Add to queue (alias for !play)
|
||||
!np Now playing
|
||||
!volume [0-100] Get/set volume (persisted across restarts)
|
||||
!keep Keep current track's audio file (with metadata)
|
||||
!kept [rm <id>|clear|repair] List, remove, clear, or repair kept files
|
||||
!testtone Play 3-second 440Hz test tone
|
||||
!playlist save <name> Save current + queued tracks as named playlist
|
||||
!playlist load <name> Append saved playlist to queue, start if idle
|
||||
!playlist list Show saved playlists with track counts
|
||||
!playlist del <name> Delete a saved playlist
|
||||
```
|
||||
|
||||
- Queue holds up to 50 tracks
|
||||
@@ -1712,7 +1716,7 @@ volume gradually restores to the user-set level in small steps.
|
||||
!duck Show ducking status and settings
|
||||
!duck on Enable voice ducking
|
||||
!duck off Disable voice ducking
|
||||
!duck floor <0-100> Set floor volume % (default: 1)
|
||||
!duck floor <0-100> Set floor volume % (default: 2)
|
||||
!duck silence <sec> Set silence timeout in seconds (default: 15)
|
||||
!duck restore <sec> Set restore ramp duration in seconds (default: 30)
|
||||
```
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
1. pymumble: ssl.wrap_socket was removed in 3.13
|
||||
2. opuslib: ctypes.util.find_library fails on musl-based distros
|
||||
3. pymumble: close stale socket on reconnect
|
||||
"""
|
||||
|
||||
import pathlib
|
||||
@@ -65,5 +66,8 @@ new_init = """\
|
||||
self.control_socket = None"""
|
||||
|
||||
assert old_init in src, "pymumble init_connection socket patch target not found"
|
||||
p.write_text(src.replace(old_init, new_init))
|
||||
src = src.replace(old_init, new_init)
|
||||
print("pymumble reconnect socket patch applied")
|
||||
|
||||
p.write_text(src)
|
||||
|
||||
|
||||
118
plugins/_musicbrainz.py
Normal file
118
plugins/_musicbrainz.py
Normal file
@@ -0,0 +1,118 @@
|
||||
"""MusicBrainz API helper for music discovery fallback.
|
||||
|
||||
Private module (underscore prefix) -- plugin loader skips it.
|
||||
All functions are blocking; callers should run them in an executor.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from urllib.request import Request
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
_BASE = "https://musicbrainz.org/ws/2"
|
||||
_UA = "derp-bot/2.0.0 (https://git.mymx.me/username/derp)"
|
||||
|
||||
# Rate limit: MusicBrainz requires max 1 request/second.
|
||||
# We use 1.1s between calls to stay well within limits.
|
||||
_RATE_INTERVAL = 1.1
|
||||
_last_request: float = 0.0
|
||||
|
||||
|
||||
def _mb_request(path: str, params: dict | None = None) -> dict:
|
||||
"""Rate-limited GET to MusicBrainz API. Blocking."""
|
||||
global _last_request
|
||||
from derp.http import urlopen
|
||||
|
||||
elapsed = time.monotonic() - _last_request
|
||||
if elapsed < _RATE_INTERVAL:
|
||||
time.sleep(_RATE_INTERVAL - elapsed)
|
||||
|
||||
qs = "&".join(f"{k}={v}" for k, v in (params or {}).items())
|
||||
url = f"{_BASE}/{path}?fmt=json&{qs}" if qs else f"{_BASE}/{path}?fmt=json"
|
||||
req = Request(url, headers={"User-Agent": _UA})
|
||||
|
||||
try:
|
||||
resp = urlopen(req, timeout=10, proxy=False)
|
||||
_last_request = time.monotonic()
|
||||
return json.loads(resp.read().decode())
|
||||
except Exception:
|
||||
_last_request = time.monotonic()
|
||||
log.warning("musicbrainz: request failed: %s", path, exc_info=True)
|
||||
return {}
|
||||
|
||||
|
||||
def mb_search_artist(name: str) -> str | None:
|
||||
"""Search for an artist by name, return MBID or None."""
|
||||
from urllib.parse import quote
|
||||
|
||||
data = _mb_request("artist", {"query": quote(name), "limit": "1"})
|
||||
artists = data.get("artists", [])
|
||||
if not artists:
|
||||
return None
|
||||
# Require a reasonable score to avoid false matches
|
||||
score = artists[0].get("score", 0)
|
||||
if score < 50:
|
||||
return None
|
||||
return artists[0].get("id")
|
||||
|
||||
|
||||
def mb_artist_tags(mbid: str) -> list[str]:
|
||||
"""Fetch top 5 tags for an artist by MBID."""
|
||||
data = _mb_request(f"artist/{mbid}", {"inc": "tags"})
|
||||
tags = data.get("tags", [])
|
||||
if not tags:
|
||||
return []
|
||||
# Sort by count descending, take top 5
|
||||
sorted_tags = sorted(tags, key=lambda t: t.get("count", 0), reverse=True)
|
||||
return [t["name"] for t in sorted_tags[:5] if t.get("name")]
|
||||
|
||||
|
||||
def mb_find_similar_recordings(artist: str, tags: list[str],
|
||||
limit: int = 10) -> list[dict]:
|
||||
"""Find recordings by other artists sharing top tags.
|
||||
|
||||
Searches MusicBrainz for recordings tagged with the top 2 tags,
|
||||
excluding the original artist. Returns [{"artist": str, "title": str}].
|
||||
"""
|
||||
from urllib.parse import quote
|
||||
|
||||
if not tags:
|
||||
return []
|
||||
|
||||
# Use top 2 tags for the query
|
||||
tag_query = " AND ".join(f'tag:"{t}"' for t in tags[:2])
|
||||
query = f'({tag_query}) AND NOT artist:"{artist}"'
|
||||
|
||||
data = _mb_request("recording", {
|
||||
"query": quote(query),
|
||||
"limit": str(limit),
|
||||
})
|
||||
recordings = data.get("recordings", [])
|
||||
if not recordings:
|
||||
return []
|
||||
|
||||
seen = set()
|
||||
results = []
|
||||
for rec in recordings:
|
||||
title = rec.get("title", "")
|
||||
credits = rec.get("artist-credit", [])
|
||||
if not credits or not title:
|
||||
continue
|
||||
rec_artist = credits[0].get("name", "") if credits else ""
|
||||
if not rec_artist:
|
||||
continue
|
||||
# Skip the original artist (case-insensitive)
|
||||
if rec_artist.lower() == artist.lower():
|
||||
continue
|
||||
# Deduplicate by artist+title
|
||||
key = f"{rec_artist.lower()}:{title.lower()}"
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
results.append({"artist": rec_artist, "title": title})
|
||||
|
||||
return results
|
||||
@@ -436,7 +436,7 @@ def _search_youtube(keyword: str) -> list[dict]:
|
||||
req = urllib.request.Request(_YT_SEARCH_URL, data=payload, method="POST")
|
||||
req.add_header("Content-Type", "application/json")
|
||||
|
||||
resp = urllib.request.urlopen(req, timeout=_FETCH_TIMEOUT)
|
||||
resp = _urlopen(req, timeout=_FETCH_TIMEOUT)
|
||||
raw = resp.read()
|
||||
resp.close()
|
||||
|
||||
@@ -545,7 +545,7 @@ def _search_searx(keyword: str) -> list[dict]:
|
||||
})
|
||||
req = urllib.request.Request(f"{_SEARX_URL}?{params}", method="GET")
|
||||
try:
|
||||
resp = urllib.request.urlopen(req, timeout=_FETCH_TIMEOUT)
|
||||
resp = _urlopen(req, timeout=_FETCH_TIMEOUT, proxy=False)
|
||||
raw = resp.read()
|
||||
resp.close()
|
||||
except Exception as exc:
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
@@ -91,6 +92,19 @@ def _search_track(api_key: str, query: str,
|
||||
# -- Metadata extraction -----------------------------------------------------
|
||||
|
||||
|
||||
def _parse_title(raw_title: str) -> tuple[str, str]:
|
||||
"""Split a raw track title into (artist, title).
|
||||
|
||||
Tries common separators: `` - ``, `` -- ``, `` | ``, `` ~ ``.
|
||||
Returns ``("", raw_title)`` if no separator is found.
|
||||
"""
|
||||
for sep in (" - ", " -- ", " | ", " ~ "):
|
||||
if sep in raw_title:
|
||||
parts = raw_title.split(sep, 1)
|
||||
return (parts[0].strip(), parts[1].strip())
|
||||
return ("", raw_title)
|
||||
|
||||
|
||||
def _current_meta(bot) -> tuple[str, str]:
|
||||
"""Extract artist and title from the currently playing track.
|
||||
|
||||
@@ -103,15 +117,60 @@ def _current_meta(bot) -> tuple[str, str]:
|
||||
if current is None:
|
||||
return ("", "")
|
||||
raw_title = current.title or ""
|
||||
return _parse_title(raw_title)
|
||||
|
||||
# Try common "Artist - Title" patterns
|
||||
for sep in (" - ", " -- ", " | ", " ~ "):
|
||||
if sep in raw_title:
|
||||
parts = raw_title.split(sep, 1)
|
||||
return (parts[0].strip(), parts[1].strip())
|
||||
|
||||
# No separator -- treat whole thing as a search query
|
||||
return ("", raw_title)
|
||||
# -- Discovery orchestrator --------------------------------------------------
|
||||
|
||||
|
||||
async def discover_similar(bot, last_track_title: str) -> tuple[str, str] | None:
|
||||
"""Find a similar track via Last.fm or MusicBrainz fallback.
|
||||
|
||||
Returns ``(artist, title)`` or ``None`` if nothing found.
|
||||
"""
|
||||
artist, title = _parse_title(last_track_title)
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
# -- Last.fm path --
|
||||
api_key = _get_api_key(bot)
|
||||
if api_key and artist:
|
||||
try:
|
||||
similar = await loop.run_in_executor(
|
||||
None, _get_similar_tracks, api_key, artist, title, 20,
|
||||
)
|
||||
if similar:
|
||||
pick = random.choice(similar)
|
||||
pick_artist = pick.get("artist", {}).get("name", "")
|
||||
pick_title = pick.get("name", "")
|
||||
if pick_artist and pick_title:
|
||||
return (pick_artist, pick_title)
|
||||
except Exception:
|
||||
log.warning("lastfm: discover via Last.fm failed", exc_info=True)
|
||||
|
||||
# -- MusicBrainz fallback --
|
||||
if artist:
|
||||
try:
|
||||
from plugins._musicbrainz import (
|
||||
mb_artist_tags,
|
||||
mb_find_similar_recordings,
|
||||
mb_search_artist,
|
||||
)
|
||||
|
||||
mbid = await loop.run_in_executor(None, mb_search_artist, artist)
|
||||
if mbid:
|
||||
tags = await loop.run_in_executor(None, mb_artist_tags, mbid)
|
||||
if tags:
|
||||
picks = await loop.run_in_executor(
|
||||
None, mb_find_similar_recordings, artist, tags, 20,
|
||||
)
|
||||
if picks:
|
||||
pick = random.choice(picks)
|
||||
return (pick["artist"], pick["title"])
|
||||
except Exception:
|
||||
log.warning("lastfm: discover via MusicBrainz failed",
|
||||
exc_info=True)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
# -- Formatting --------------------------------------------------------------
|
||||
@@ -139,9 +198,6 @@ async def cmd_similar(bot, message):
|
||||
!similar play <artist> Queue a similar track for named artist
|
||||
"""
|
||||
api_key = _get_api_key(bot)
|
||||
if not api_key:
|
||||
await bot.reply(message, "Last.fm API key not configured")
|
||||
return
|
||||
|
||||
parts = message.text.split(None, 2)
|
||||
# !similar play [artist]
|
||||
@@ -164,38 +220,86 @@ async def cmd_similar(bot, message):
|
||||
await bot.reply(message, "Nothing playing and no artist given")
|
||||
return
|
||||
|
||||
# Try track-level similarity first if we have both artist + title
|
||||
# -- Last.fm path --
|
||||
similar = []
|
||||
if artist and title:
|
||||
similar = await loop.run_in_executor(
|
||||
None, _get_similar_tracks, api_key, artist, title,
|
||||
)
|
||||
similar_artists = []
|
||||
if api_key:
|
||||
# Try track-level similarity first if we have both artist + title
|
||||
if artist and title:
|
||||
similar = await loop.run_in_executor(
|
||||
None, _get_similar_tracks, api_key, artist, title,
|
||||
)
|
||||
# Fall back to artist-level similarity
|
||||
if not similar:
|
||||
search_artist = artist or title
|
||||
similar_artists = await loop.run_in_executor(
|
||||
None, _get_similar_artists, api_key, search_artist,
|
||||
)
|
||||
|
||||
# Fall back to artist-level similarity
|
||||
if not similar:
|
||||
# -- MusicBrainz fallback --
|
||||
mb_results = []
|
||||
if not similar and not similar_artists:
|
||||
search_artist = artist or title
|
||||
similar_artists = await loop.run_in_executor(
|
||||
None, _get_similar_artists, api_key, search_artist,
|
||||
)
|
||||
if not similar_artists:
|
||||
await bot.reply(message, f"No similar artists found for '{search_artist}'")
|
||||
try:
|
||||
from plugins._musicbrainz import (
|
||||
mb_artist_tags,
|
||||
mb_find_similar_recordings,
|
||||
mb_search_artist,
|
||||
)
|
||||
mbid = await loop.run_in_executor(
|
||||
None, mb_search_artist, search_artist,
|
||||
)
|
||||
if mbid:
|
||||
tags = await loop.run_in_executor(None, mb_artist_tags, mbid)
|
||||
if tags:
|
||||
mb_results = await loop.run_in_executor(
|
||||
None, mb_find_similar_recordings, search_artist,
|
||||
tags, 20,
|
||||
)
|
||||
except Exception:
|
||||
log.warning("lastfm: MusicBrainz fallback failed", exc_info=True)
|
||||
|
||||
# -- Track-level results (Last.fm) --
|
||||
if similar:
|
||||
if play_mode:
|
||||
pick = random.choice(similar[:10])
|
||||
pick_artist = pick.get("artist", {}).get("name", "")
|
||||
pick_title = pick.get("name", "")
|
||||
search = f"{pick_artist} {pick_title}".strip()
|
||||
if not search:
|
||||
await bot.reply(message, "No playable result found")
|
||||
return
|
||||
message.text = f"!play {search}"
|
||||
music_mod = bot.registry._modules.get("music")
|
||||
if music_mod:
|
||||
await music_mod.cmd_play(bot, message)
|
||||
return
|
||||
|
||||
lines = [f"Similar to {artist} - {title}:"]
|
||||
for t in similar[:8]:
|
||||
t_artist = t.get("artist", {}).get("name", "")
|
||||
t_name = t.get("name", "?")
|
||||
match = _fmt_match(t.get("match", ""))
|
||||
suffix = f" ({match})" if match else ""
|
||||
lines.append(f" {t_artist} - {t_name}{suffix}")
|
||||
await bot.long_reply(message, lines, label="similar tracks")
|
||||
return
|
||||
|
||||
# -- Artist-level results (Last.fm) --
|
||||
if similar_artists:
|
||||
search_artist = artist or title
|
||||
if play_mode:
|
||||
# Pick a random similar artist and search YouTube
|
||||
pick = random.choice(similar_artists[:10])
|
||||
pick_name = pick.get("name", "")
|
||||
if not pick_name:
|
||||
await bot.reply(message, "No playable result found")
|
||||
return
|
||||
# Inject a !play command with a YouTube search
|
||||
message.text = f"!play {pick_name}"
|
||||
music_mod = bot.registry._modules.get("music")
|
||||
if music_mod:
|
||||
await music_mod.cmd_play(bot, message)
|
||||
return
|
||||
|
||||
# Display similar artists
|
||||
lines = [f"Similar to {search_artist}:"]
|
||||
for a in similar_artists[:8]:
|
||||
name = a.get("name", "?")
|
||||
@@ -205,30 +309,27 @@ async def cmd_similar(bot, message):
|
||||
await bot.long_reply(message, lines, label="similar artists")
|
||||
return
|
||||
|
||||
# Track-level results
|
||||
if play_mode:
|
||||
pick = random.choice(similar[:10])
|
||||
pick_artist = pick.get("artist", {}).get("name", "")
|
||||
pick_title = pick.get("name", "")
|
||||
search = f"{pick_artist} {pick_title}".strip()
|
||||
if not search:
|
||||
await bot.reply(message, "No playable result found")
|
||||
# -- MusicBrainz results --
|
||||
if mb_results:
|
||||
search_artist = artist or title
|
||||
if play_mode:
|
||||
pick = random.choice(mb_results[:10])
|
||||
search = f"{pick['artist']} {pick['title']}".strip()
|
||||
message.text = f"!play {search}"
|
||||
music_mod = bot.registry._modules.get("music")
|
||||
if music_mod:
|
||||
await music_mod.cmd_play(bot, message)
|
||||
return
|
||||
message.text = f"!play {search}"
|
||||
music_mod = bot.registry._modules.get("music")
|
||||
if music_mod:
|
||||
await music_mod.cmd_play(bot, message)
|
||||
|
||||
lines = [f"Similar to {search_artist}:"]
|
||||
for r in mb_results[:8]:
|
||||
lines.append(f" {r['artist']} - {r['title']}")
|
||||
await bot.long_reply(message, lines, label="similar tracks")
|
||||
return
|
||||
|
||||
# Display similar tracks
|
||||
lines = [f"Similar to {artist} - {title}:"]
|
||||
for t in similar[:8]:
|
||||
t_artist = t.get("artist", {}).get("name", "")
|
||||
t_name = t.get("name", "?")
|
||||
match = _fmt_match(t.get("match", ""))
|
||||
suffix = f" ({match})" if match else ""
|
||||
lines.append(f" {t_artist} - {t_name}{suffix}")
|
||||
await bot.long_reply(message, lines, label="similar tracks")
|
||||
# Nothing found
|
||||
search_artist = artist or title
|
||||
await bot.reply(message, f"No similar artists found for '{search_artist}'")
|
||||
|
||||
|
||||
@command("tags", help="Music: !tags [artist] -- show genre tags")
|
||||
@@ -240,9 +341,6 @@ async def cmd_tags(bot, message):
|
||||
!tags <artist> Tags for named artist
|
||||
"""
|
||||
api_key = _get_api_key(bot)
|
||||
if not api_key:
|
||||
await bot.reply(message, "Last.fm API key not configured")
|
||||
return
|
||||
|
||||
parts = message.text.split(None, 1)
|
||||
query = parts[1].strip() if len(parts) > 1 else ""
|
||||
@@ -259,9 +357,28 @@ async def cmd_tags(bot, message):
|
||||
await bot.reply(message, "Nothing playing and no artist given")
|
||||
return
|
||||
|
||||
tags = await loop.run_in_executor(
|
||||
None, _get_top_tags, api_key, artist,
|
||||
)
|
||||
# -- Last.fm path --
|
||||
tags = []
|
||||
if api_key:
|
||||
tags = await loop.run_in_executor(
|
||||
None, _get_top_tags, api_key, artist,
|
||||
)
|
||||
|
||||
# -- MusicBrainz fallback --
|
||||
if not tags:
|
||||
try:
|
||||
from plugins._musicbrainz import mb_artist_tags, mb_search_artist
|
||||
mbid = await loop.run_in_executor(None, mb_search_artist, artist)
|
||||
if mbid:
|
||||
mb_tags = await loop.run_in_executor(
|
||||
None, mb_artist_tags, mbid,
|
||||
)
|
||||
if mb_tags:
|
||||
await bot.reply(message, f"{artist}: {', '.join(mb_tags)}")
|
||||
return
|
||||
except Exception:
|
||||
log.warning("lastfm: MusicBrainz tag fallback failed",
|
||||
exc_info=True)
|
||||
|
||||
if not tags:
|
||||
await bot.reply(message, f"No tags found for '{artist}'")
|
||||
|
||||
276
plugins/mumble_admin.py
Normal file
276
plugins/mumble_admin.py
Normal file
@@ -0,0 +1,276 @@
|
||||
"""Plugin: Mumble server administration via chat commands."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
|
||||
from derp.plugin import command
|
||||
|
||||
_log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# -- Helpers -----------------------------------------------------------------
|
||||
|
||||
|
||||
def _find_user(bot, name: str):
|
||||
"""Case-insensitive user lookup by name. Returns pymumble User or None."""
|
||||
mumble = getattr(bot, "_mumble", None)
|
||||
if mumble is None:
|
||||
return None
|
||||
lower = name.lower()
|
||||
for sid in list(mumble.users):
|
||||
user = mumble.users[sid]
|
||||
if user["name"].lower() == lower:
|
||||
return user
|
||||
return None
|
||||
|
||||
|
||||
def _find_channel(bot, name: str):
|
||||
"""Case-insensitive channel lookup by name. Returns pymumble Channel or None."""
|
||||
mumble = getattr(bot, "_mumble", None)
|
||||
if mumble is None:
|
||||
return None
|
||||
lower = name.lower()
|
||||
for cid in list(mumble.channels):
|
||||
chan = mumble.channels[cid]
|
||||
if chan["name"].lower() == lower:
|
||||
return chan
|
||||
return None
|
||||
|
||||
|
||||
def _channel_name(bot, channel_id: int) -> str:
|
||||
"""Resolve a channel ID to its name, or return the ID as string."""
|
||||
mumble = getattr(bot, "_mumble", None)
|
||||
if mumble is None:
|
||||
return str(channel_id)
|
||||
chan = mumble.channels.get(channel_id)
|
||||
if chan is None:
|
||||
return str(channel_id)
|
||||
return chan["name"]
|
||||
|
||||
|
||||
# -- Sub-handlers ------------------------------------------------------------
|
||||
|
||||
|
||||
async def _sub_kick(bot, message, args: list[str]) -> None:
|
||||
if not args:
|
||||
await bot.reply(message, "Usage: !mu kick <user> [reason]")
|
||||
return
|
||||
user = _find_user(bot, args[0])
|
||||
if user is None:
|
||||
await bot.reply(message, f"User not found: {args[0]}")
|
||||
return
|
||||
reason = " ".join(args[1:]) if len(args) > 1 else ""
|
||||
user.kick(reason)
|
||||
await bot.reply(message, f"Kicked {user['name']}")
|
||||
|
||||
|
||||
async def _sub_ban(bot, message, args: list[str]) -> None:
|
||||
if not args:
|
||||
await bot.reply(message, "Usage: !mu ban <user> [reason]")
|
||||
return
|
||||
user = _find_user(bot, args[0])
|
||||
if user is None:
|
||||
await bot.reply(message, f"User not found: {args[0]}")
|
||||
return
|
||||
reason = " ".join(args[1:]) if len(args) > 1 else ""
|
||||
user.ban(reason)
|
||||
await bot.reply(message, f"Banned {user['name']}")
|
||||
|
||||
|
||||
async def _sub_mute(bot, message, args: list[str]) -> None:
|
||||
if not args:
|
||||
await bot.reply(message, "Usage: !mu mute <user>")
|
||||
return
|
||||
user = _find_user(bot, args[0])
|
||||
if user is None:
|
||||
await bot.reply(message, f"User not found: {args[0]}")
|
||||
return
|
||||
user.mute()
|
||||
await bot.reply(message, f"Muted {user['name']}")
|
||||
|
||||
|
||||
async def _sub_unmute(bot, message, args: list[str]) -> None:
|
||||
if not args:
|
||||
await bot.reply(message, "Usage: !mu unmute <user>")
|
||||
return
|
||||
user = _find_user(bot, args[0])
|
||||
if user is None:
|
||||
await bot.reply(message, f"User not found: {args[0]}")
|
||||
return
|
||||
user.unmute()
|
||||
await bot.reply(message, f"Unmuted {user['name']}")
|
||||
|
||||
|
||||
async def _sub_deafen(bot, message, args: list[str]) -> None:
|
||||
if not args:
|
||||
await bot.reply(message, "Usage: !mu deafen <user>")
|
||||
return
|
||||
user = _find_user(bot, args[0])
|
||||
if user is None:
|
||||
await bot.reply(message, f"User not found: {args[0]}")
|
||||
return
|
||||
user.deafen()
|
||||
await bot.reply(message, f"Deafened {user['name']}")
|
||||
|
||||
|
||||
async def _sub_undeafen(bot, message, args: list[str]) -> None:
|
||||
if not args:
|
||||
await bot.reply(message, "Usage: !mu undeafen <user>")
|
||||
return
|
||||
user = _find_user(bot, args[0])
|
||||
if user is None:
|
||||
await bot.reply(message, f"User not found: {args[0]}")
|
||||
return
|
||||
user.undeafen()
|
||||
await bot.reply(message, f"Undeafened {user['name']}")
|
||||
|
||||
|
||||
async def _sub_move(bot, message, args: list[str]) -> None:
|
||||
if len(args) < 2:
|
||||
await bot.reply(message, "Usage: !mu move <user> <channel>")
|
||||
return
|
||||
user = _find_user(bot, args[0])
|
||||
if user is None:
|
||||
await bot.reply(message, f"User not found: {args[0]}")
|
||||
return
|
||||
chan = _find_channel(bot, " ".join(args[1:]))
|
||||
if chan is None:
|
||||
await bot.reply(message, f"Channel not found: {' '.join(args[1:])}")
|
||||
return
|
||||
user.move_in(chan["channel_id"])
|
||||
await bot.reply(message, f"Moved {user['name']} to {chan['name']}")
|
||||
|
||||
|
||||
async def _sub_users(bot, message, args: list[str]) -> None:
|
||||
mumble = getattr(bot, "_mumble", None)
|
||||
if mumble is None:
|
||||
return
|
||||
bots = getattr(bot.registry, "_bots", {})
|
||||
lines: list[str] = []
|
||||
for sid in sorted(mumble.users):
|
||||
user = mumble.users[sid]
|
||||
name = user["name"]
|
||||
flags: list[str] = []
|
||||
if name in bots:
|
||||
flags.append("bot")
|
||||
if user.get("mute") or user.get("self_mute"):
|
||||
flags.append("muted")
|
||||
if user.get("deaf") or user.get("self_deaf"):
|
||||
flags.append("deaf")
|
||||
chan = _channel_name(bot, user.get("channel_id", 0))
|
||||
tag = f" [{', '.join(flags)}]" if flags else ""
|
||||
lines.append(f" {name} in {chan}{tag}")
|
||||
header = f"Online: {len(lines)} user(s)"
|
||||
await bot.reply(message, header + "\n" + "\n".join(lines))
|
||||
|
||||
|
||||
async def _sub_channels(bot, message, args: list[str]) -> None:
|
||||
mumble = getattr(bot, "_mumble", None)
|
||||
if mumble is None:
|
||||
return
|
||||
lines: list[str] = []
|
||||
for cid in sorted(mumble.channels):
|
||||
chan = mumble.channels[cid]
|
||||
name = chan["name"]
|
||||
# Count users in this channel
|
||||
count = sum(
|
||||
1 for sid in mumble.users
|
||||
if mumble.users[sid].get("channel_id") == cid
|
||||
)
|
||||
lines.append(f" {name} ({count})")
|
||||
await bot.reply(message, "Channels:\n" + "\n".join(lines))
|
||||
|
||||
|
||||
async def _sub_mkchan(bot, message, args: list[str]) -> None:
|
||||
if not args:
|
||||
await bot.reply(message, "Usage: !mu mkchan <name> [temp]")
|
||||
return
|
||||
mumble = getattr(bot, "_mumble", None)
|
||||
if mumble is None:
|
||||
return
|
||||
name = args[0]
|
||||
temp = len(args) > 1 and args[1].lower() in ("temp", "temporary", "true")
|
||||
mumble.channels.new_channel(0, name, temporary=temp)
|
||||
label = " (temporary)" if temp else ""
|
||||
await bot.reply(message, f"Created channel: {name}{label}")
|
||||
|
||||
|
||||
async def _sub_rmchan(bot, message, args: list[str]) -> None:
|
||||
if not args:
|
||||
await bot.reply(message, "Usage: !mu rmchan <channel>")
|
||||
return
|
||||
chan = _find_channel(bot, " ".join(args))
|
||||
if chan is None:
|
||||
await bot.reply(message, f"Channel not found: {' '.join(args)}")
|
||||
return
|
||||
name = chan["name"]
|
||||
chan.remove()
|
||||
await bot.reply(message, f"Removed channel: {name}")
|
||||
|
||||
|
||||
async def _sub_rename(bot, message, args: list[str]) -> None:
|
||||
if len(args) < 2:
|
||||
await bot.reply(message, "Usage: !mu rename <channel> <new-name>")
|
||||
return
|
||||
chan = _find_channel(bot, args[0])
|
||||
if chan is None:
|
||||
await bot.reply(message, f"Channel not found: {args[0]}")
|
||||
return
|
||||
old = chan["name"]
|
||||
chan.rename_channel(args[1])
|
||||
await bot.reply(message, f"Renamed {old} to {args[1]}")
|
||||
|
||||
|
||||
async def _sub_desc(bot, message, args: list[str]) -> None:
|
||||
if len(args) < 2:
|
||||
await bot.reply(message, "Usage: !mu desc <channel> <text>")
|
||||
return
|
||||
chan = _find_channel(bot, args[0])
|
||||
if chan is None:
|
||||
await bot.reply(message, f"Channel not found: {args[0]}")
|
||||
return
|
||||
text = " ".join(args[1:])
|
||||
chan.set_channel_description(text)
|
||||
await bot.reply(message, f"Set description for {chan['name']}")
|
||||
|
||||
|
||||
# -- Dispatch table ----------------------------------------------------------
|
||||
|
||||
|
||||
_SUBS: dict[str, object] = {
|
||||
"kick": _sub_kick,
|
||||
"ban": _sub_ban,
|
||||
"mute": _sub_mute,
|
||||
"unmute": _sub_unmute,
|
||||
"deafen": _sub_deafen,
|
||||
"undeafen": _sub_undeafen,
|
||||
"move": _sub_move,
|
||||
"users": _sub_users,
|
||||
"channels": _sub_channels,
|
||||
"mkchan": _sub_mkchan,
|
||||
"rmchan": _sub_rmchan,
|
||||
"rename": _sub_rename,
|
||||
"desc": _sub_desc,
|
||||
}
|
||||
|
||||
_USAGE = (
|
||||
"Usage: !mu <action> [args]\n"
|
||||
"Actions: kick, ban, mute, unmute, deafen, undeafen, move, "
|
||||
"users, channels, mkchan, rmchan, rename, desc"
|
||||
)
|
||||
|
||||
|
||||
@command("mu", help="Mumble admin: !mu <action> [args]", tier="admin")
|
||||
async def cmd_mu(bot, message):
|
||||
"""Mumble server administration commands."""
|
||||
parts = message.text.split()
|
||||
if len(parts) < 2:
|
||||
await bot.reply(message, _USAGE)
|
||||
return
|
||||
sub = parts[1].lower()
|
||||
handler = _SUBS.get(sub)
|
||||
if handler is None:
|
||||
await bot.reply(message, _USAGE)
|
||||
return
|
||||
await handler(bot, message, parts[2:])
|
||||
265
plugins/music.py
265
plugins/music.py
@@ -58,6 +58,8 @@ def _ps(bot):
|
||||
"history": [],
|
||||
"autoplay": cfg.get("autoplay", True),
|
||||
"autoplay_cooldown": cfg.get("autoplay_cooldown", 30),
|
||||
"discover": cfg.get("discover", True),
|
||||
"discover_ratio": cfg.get("discover_ratio", 3),
|
||||
"announce": cfg.get("announce", False),
|
||||
"paused": None,
|
||||
"_watcher_task": None,
|
||||
@@ -575,17 +577,64 @@ async def _play_loop(bot, *, seek: float = 0.0, fade_in: float | bool = True) ->
|
||||
seek_req = [None]
|
||||
ps["seek_req"] = seek_req
|
||||
_autoplay_pool: list[_Track] = [] # shuffled deck, refilled each cycle
|
||||
_discover_seen: set[str] = set() # "artist:title" dedup within session
|
||||
_autoplay_count: int = 0 # autoplay picks since loop start
|
||||
try:
|
||||
while ps["queue"] or ps.get("autoplay"):
|
||||
# Autoplay: cooldown + silence wait, then pick next from shuffled deck
|
||||
if not ps["queue"]:
|
||||
if not _autoplay_pool:
|
||||
kept = _load_kept_tracks(bot)
|
||||
if not kept:
|
||||
break
|
||||
random.shuffle(kept)
|
||||
_autoplay_pool = kept
|
||||
log.info("music: autoplay shuffled %d kept tracks", len(kept))
|
||||
_autoplay_count += 1
|
||||
|
||||
# -- Discovery attempt on every Nth autoplay pick --
|
||||
discovered = False
|
||||
ratio = ps.get("discover_ratio", 3)
|
||||
if (ps.get("discover") and ratio > 0
|
||||
and _autoplay_count % ratio == 0
|
||||
and ps["history"]):
|
||||
last = ps["history"][-1]
|
||||
try:
|
||||
lfm = bot.registry._modules.get("lastfm")
|
||||
if lfm and hasattr(lfm, "discover_similar"):
|
||||
pair = await lfm.discover_similar(bot, last.title)
|
||||
if pair:
|
||||
a, t = pair
|
||||
key = f"{a.lower()}:{t.lower()}"
|
||||
if key not in _discover_seen:
|
||||
_discover_seen.add(key)
|
||||
loop = asyncio.get_running_loop()
|
||||
res = await loop.run_in_executor(
|
||||
None, _resolve_tracks,
|
||||
f"{a} {t}", 1,
|
||||
)
|
||||
if res:
|
||||
discovered = True
|
||||
pick = _Track(
|
||||
url=res[0][0], title=res[0][1],
|
||||
requester="discover",
|
||||
)
|
||||
log.info(
|
||||
"music: discovered '%s' "
|
||||
"similar to '%s'",
|
||||
pick.title, last.title,
|
||||
)
|
||||
except Exception:
|
||||
log.warning(
|
||||
"music: discovery failed, using kept deck",
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
# -- Kept-deck fallback --
|
||||
if not discovered:
|
||||
if not _autoplay_pool:
|
||||
kept = _load_kept_tracks(bot)
|
||||
if not kept:
|
||||
break
|
||||
random.shuffle(kept)
|
||||
_autoplay_pool = kept
|
||||
log.info("music: autoplay shuffled %d kept tracks",
|
||||
len(kept))
|
||||
pick = _autoplay_pool.pop(0)
|
||||
|
||||
cooldown = ps.get("autoplay_cooldown", 30)
|
||||
log.info("music: autoplay cooldown %ds before next track",
|
||||
cooldown)
|
||||
@@ -602,9 +651,8 @@ async def _play_loop(bot, *, seek: float = 0.0, fade_in: float | bool = True) ->
|
||||
# Re-check: someone may have queued something or stopped
|
||||
if ps["queue"]:
|
||||
continue
|
||||
pick = _autoplay_pool.pop(0)
|
||||
ps["queue"].append(pick)
|
||||
log.info("music: autoplay picked '%s' (%d remaining)",
|
||||
log.info("music: autoplay queued '%s' (%d pool remaining)",
|
||||
pick.title, len(_autoplay_pool))
|
||||
track = ps["queue"].pop(0)
|
||||
ps["current"] = track
|
||||
@@ -1194,15 +1242,30 @@ async def cmd_queue(bot, message):
|
||||
ps = _ps(bot)
|
||||
lines = []
|
||||
if ps["current"]:
|
||||
track = ps["current"]
|
||||
progress = ps.get("progress")
|
||||
cur_seek = ps.get("cur_seek", 0.0)
|
||||
elapsed = cur_seek + (progress[0] * 0.02 if progress else 0.0)
|
||||
pos = _fmt_time(elapsed)
|
||||
if track.duration > 0:
|
||||
pos = f"{pos}/{_fmt_time(track.duration)}"
|
||||
lines.append(
|
||||
f"Now: {_truncate(ps['current'].title)}"
|
||||
f" [{ps['current'].requester}]"
|
||||
f"Now: {_truncate(track.title)}"
|
||||
f" [{track.requester}] ({pos})"
|
||||
)
|
||||
if ps["queue"]:
|
||||
total_dur = 0.0
|
||||
for i, track in enumerate(ps["queue"], 1):
|
||||
dur = f" ({_fmt_time(track.duration)})" if track.duration > 0 else ""
|
||||
total_dur += track.duration
|
||||
lines.append(
|
||||
f" {i}. {_truncate(track.title)} [{track.requester}]"
|
||||
f" {i}. {_truncate(track.title)} [{track.requester}]{dur}"
|
||||
)
|
||||
count = len(ps["queue"])
|
||||
footer = f"Queue: {count} track{'s' if count != 1 else ''}"
|
||||
if total_dur > 0:
|
||||
footer += f", {_fmt_time(total_dur)} total"
|
||||
lines.append(footer)
|
||||
else:
|
||||
if not ps["current"]:
|
||||
lines.append("Queue empty")
|
||||
@@ -1659,6 +1722,184 @@ async def _kept_repair(bot, message) -> None:
|
||||
await bot.reply(message, msg)
|
||||
|
||||
|
||||
@command("playlist", help="Music: !playlist save|load|list|del <name>")
|
||||
async def cmd_playlist(bot, message):
|
||||
"""Save, load, list, delete, import, or show named playlists.
|
||||
|
||||
Usage:
|
||||
!playlist save <name> Save current + queued tracks as a playlist
|
||||
!playlist load <name> [shuffle] Append saved playlist to queue
|
||||
!playlist list Show saved playlists with track counts
|
||||
!playlist del <name> Delete a saved playlist
|
||||
!playlist import <name> <url> Import tracks from URL as a named playlist
|
||||
!playlist show <name> Display tracks in a saved playlist
|
||||
"""
|
||||
if not _is_mumble(bot):
|
||||
await bot.reply(message, "Mumble-only feature")
|
||||
return
|
||||
|
||||
parts = message.text.split()
|
||||
if len(parts) < 2:
|
||||
await bot.reply(
|
||||
message, "Usage: !playlist save|load|list|del|import|show <name>",
|
||||
)
|
||||
return
|
||||
|
||||
sub = parts[1].lower()
|
||||
|
||||
if sub == "save":
|
||||
if len(parts) < 3:
|
||||
await bot.reply(message, "Usage: !playlist save <name>")
|
||||
return
|
||||
name = parts[2].lower()
|
||||
ps = _ps(bot)
|
||||
entries = []
|
||||
if ps["current"]:
|
||||
t = ps["current"]
|
||||
entries.append({"url": t.url, "title": t.title,
|
||||
"requester": t.requester})
|
||||
for t in ps["queue"]:
|
||||
entries.append({"url": t.url, "title": t.title,
|
||||
"requester": t.requester})
|
||||
if not entries:
|
||||
await bot.reply(message, "Nothing to save")
|
||||
return
|
||||
bot.state.set("music", f"playlist:{name}", json.dumps(entries))
|
||||
await bot.reply(
|
||||
message,
|
||||
f"Saved playlist '{name}' ({len(entries)} track"
|
||||
f"{'s' if len(entries) != 1 else ''})",
|
||||
)
|
||||
|
||||
elif sub == "load":
|
||||
if len(parts) < 3:
|
||||
await bot.reply(message, "Usage: !playlist load <name> [shuffle]")
|
||||
return
|
||||
name = parts[2].lower()
|
||||
shuffle = len(parts) >= 4 and parts[3].lower() == "shuffle"
|
||||
raw = bot.state.get("music", f"playlist:{name}")
|
||||
if not raw:
|
||||
await bot.reply(message, f"No playlist named '{name}'")
|
||||
return
|
||||
try:
|
||||
entries = json.loads(raw)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
await bot.reply(message, f"Corrupt playlist '{name}'")
|
||||
return
|
||||
ps = _ps(bot)
|
||||
was_idle = ps["current"] is None
|
||||
added = 0
|
||||
for e in entries:
|
||||
if len(ps["queue"]) >= _MAX_QUEUE:
|
||||
break
|
||||
ps["queue"].append(_Track(
|
||||
url=e["url"], title=e.get("title", e["url"]),
|
||||
requester=e.get("requester", "?"),
|
||||
))
|
||||
added += 1
|
||||
if shuffle and ps["queue"]:
|
||||
random.shuffle(ps["queue"])
|
||||
suffix = " (shuffled)" if shuffle else ""
|
||||
await bot.reply(
|
||||
message,
|
||||
f"Loaded '{name}': {added} track{'s' if added != 1 else ''}{suffix}",
|
||||
)
|
||||
if was_idle:
|
||||
_ensure_loop(bot)
|
||||
|
||||
elif sub == "list":
|
||||
names = []
|
||||
for key in bot.state.keys("music"):
|
||||
if not key.startswith("playlist:"):
|
||||
continue
|
||||
pname = key.split(":", 1)[1]
|
||||
raw = bot.state.get("music", key)
|
||||
count = 0
|
||||
if raw:
|
||||
try:
|
||||
count = len(json.loads(raw))
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
pass
|
||||
names.append((pname, count))
|
||||
if not names:
|
||||
await bot.reply(message, "No saved playlists")
|
||||
return
|
||||
names.sort()
|
||||
lines = [f"Playlists ({len(names)}):"]
|
||||
for pname, count in names:
|
||||
lines.append(
|
||||
f" {pname} ({count} track{'s' if count != 1 else ''})",
|
||||
)
|
||||
for line in lines:
|
||||
await bot.reply(message, line)
|
||||
|
||||
elif sub == "del":
|
||||
if len(parts) < 3:
|
||||
await bot.reply(message, "Usage: !playlist del <name>")
|
||||
return
|
||||
name = parts[2].lower()
|
||||
raw = bot.state.get("music", f"playlist:{name}")
|
||||
if not raw:
|
||||
await bot.reply(message, f"No playlist named '{name}'")
|
||||
return
|
||||
bot.state.delete("music", f"playlist:{name}")
|
||||
await bot.reply(message, f"Deleted playlist '{name}'")
|
||||
|
||||
elif sub == "import":
|
||||
if len(parts) < 4:
|
||||
await bot.reply(message, "Usage: !playlist import <name> <url>")
|
||||
return
|
||||
name = parts[2].lower()
|
||||
url = parts[3]
|
||||
await bot.reply(message, f"Importing '{name}' from URL...")
|
||||
loop = asyncio.get_running_loop()
|
||||
try:
|
||||
resolved = await loop.run_in_executor(None, _resolve_tracks, url)
|
||||
except Exception:
|
||||
await bot.reply(message, "Failed to resolve URL")
|
||||
return
|
||||
if not resolved:
|
||||
await bot.reply(message, "No tracks found")
|
||||
return
|
||||
requester = message.nick or "?"
|
||||
entries = [{"url": u, "title": t, "requester": requester}
|
||||
for u, t in resolved]
|
||||
bot.state.set("music", f"playlist:{name}", json.dumps(entries))
|
||||
await bot.reply(
|
||||
message,
|
||||
f"Imported playlist '{name}' ({len(entries)} track"
|
||||
f"{'s' if len(entries) != 1 else ''})",
|
||||
)
|
||||
|
||||
elif sub == "show":
|
||||
if len(parts) < 3:
|
||||
await bot.reply(message, "Usage: !playlist show <name>")
|
||||
return
|
||||
name = parts[2].lower()
|
||||
raw = bot.state.get("music", f"playlist:{name}")
|
||||
if not raw:
|
||||
await bot.reply(message, f"No playlist named '{name}'")
|
||||
return
|
||||
try:
|
||||
entries = json.loads(raw)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
await bot.reply(message, f"Corrupt playlist '{name}'")
|
||||
return
|
||||
if not entries:
|
||||
await bot.reply(message, f"Playlist '{name}' is empty")
|
||||
return
|
||||
lines = [f"Playlist '{name}' ({len(entries)} tracks):"]
|
||||
for i, e in enumerate(entries, 1):
|
||||
title = _truncate(e.get("title", e["url"]))
|
||||
lines.append(f" {i:>2}. {title}")
|
||||
await bot.long_reply(message, lines, label=name)
|
||||
|
||||
else:
|
||||
await bot.reply(
|
||||
message, "Usage: !playlist save|load|list|del|import|show <name>",
|
||||
)
|
||||
|
||||
|
||||
# -- Plugin lifecycle --------------------------------------------------------
|
||||
|
||||
|
||||
|
||||
@@ -47,9 +47,12 @@ _PIPER_URL = "http://192.168.129.9:5100/"
|
||||
def _ps(bot):
|
||||
"""Per-bot plugin runtime state."""
|
||||
cfg = getattr(bot, "config", {}).get("voice", {})
|
||||
trigger = cfg.get("trigger", "")
|
||||
# Bias Whisper toward the trigger word unless explicitly configured
|
||||
default_prompt = f"{trigger.capitalize()}, " if trigger else ""
|
||||
return bot._pstate.setdefault("voice", {
|
||||
"listen": False,
|
||||
"trigger": cfg.get("trigger", ""),
|
||||
"trigger": trigger,
|
||||
"buffers": {}, # {username: bytearray}
|
||||
"last_ts": {}, # {username: float monotonic}
|
||||
"flush_task": None,
|
||||
@@ -62,6 +65,7 @@ def _ps(bot):
|
||||
"noise_scale": cfg.get("noise_scale", 0.667),
|
||||
"noise_w": cfg.get("noise_w", 0.8),
|
||||
"fx": cfg.get("fx", ""),
|
||||
"initial_prompt": cfg.get("initial_prompt", default_prompt),
|
||||
"_listener_registered": False,
|
||||
})
|
||||
|
||||
@@ -170,8 +174,17 @@ def _transcribe(ps, pcm: bytes) -> str:
|
||||
).encode() + wav_data + (
|
||||
f"\r\n--{boundary}\r\n"
|
||||
f'Content-Disposition: form-data; name="response_format"\r\n\r\n'
|
||||
f"json\r\n--{boundary}--\r\n"
|
||||
f"json"
|
||||
).encode()
|
||||
# Bias Whisper toward the trigger word when configured
|
||||
prompt = ps.get("initial_prompt", "")
|
||||
if prompt:
|
||||
body += (
|
||||
f"\r\n--{boundary}\r\n"
|
||||
f'Content-Disposition: form-data; name="initial_prompt"\r\n\r\n'
|
||||
f"{prompt}"
|
||||
).encode()
|
||||
body += f"\r\n--{boundary}--\r\n".encode()
|
||||
req = urllib.request.Request(ps["whisper_url"], data=body, method="POST")
|
||||
req.add_header("Content-Type", f"multipart/form-data; boundary={boundary}")
|
||||
resp = _urlopen(req, timeout=30, proxy=False)
|
||||
@@ -552,27 +565,13 @@ async def cmd_audition(bot, message):
|
||||
f"{_deep},{_bass},{_echo_chamber}"),
|
||||
]
|
||||
|
||||
# Find merlin (the listener bot) -- plays the audition samples
|
||||
merlin = None
|
||||
for peer in getattr(bot.registry, "_bots", {}).values():
|
||||
if getattr(peer, "_receive_sound", False):
|
||||
merlin = peer
|
||||
break
|
||||
|
||||
await bot.reply(message, f"Auditioning {len(samples)} voice samples...")
|
||||
loop = asyncio.get_running_loop()
|
||||
from pathlib import Path
|
||||
|
||||
# Pre-generate derp's default voice (same phrase, no FX)
|
||||
derp_wav = await loop.run_in_executor(
|
||||
None, lambda: _fetch_tts_voice(piper_url, phrase),
|
||||
)
|
||||
|
||||
for i, (label, voice, sid, fx) in enumerate(samples, 1):
|
||||
announcer = merlin or bot
|
||||
await announcer.send("0", f"[{i}/{len(samples)}] {label}")
|
||||
await bot.send("0", f"[{i}/{len(samples)}] {label}")
|
||||
await asyncio.sleep(1)
|
||||
# Generate the audition sample (merlin's candidate voice)
|
||||
sample_wav = await loop.run_in_executor(
|
||||
None, lambda v=voice, s=sid, f=fx: _fetch_tts_voice(
|
||||
piper_url, phrase, voice=v, speaker_id=s,
|
||||
@@ -583,40 +582,36 @@ async def cmd_audition(bot, message):
|
||||
await bot.send("0", " (failed)")
|
||||
continue
|
||||
try:
|
||||
# Both bots speak simultaneously:
|
||||
# merlin plays the audition sample, derp plays its default voice
|
||||
merlin_done = asyncio.Event()
|
||||
derp_done = asyncio.Event()
|
||||
if merlin:
|
||||
merlin_task = asyncio.create_task(
|
||||
merlin.stream_audio(sample_wav, volume=1.0,
|
||||
on_done=merlin_done))
|
||||
derp_task = asyncio.create_task(
|
||||
bot.stream_audio(derp_wav, volume=1.0,
|
||||
on_done=derp_done))
|
||||
await asyncio.gather(merlin_task, derp_task)
|
||||
else:
|
||||
await bot.stream_audio(sample_wav, volume=1.0,
|
||||
on_done=merlin_done)
|
||||
await merlin_done.wait()
|
||||
done = asyncio.Event()
|
||||
await bot.stream_audio(sample_wav, volume=1.0, on_done=done)
|
||||
await done.wait()
|
||||
finally:
|
||||
Path(sample_wav).unlink(missing_ok=True)
|
||||
await asyncio.sleep(2)
|
||||
|
||||
if derp_wav:
|
||||
Path(derp_wav).unlink(missing_ok=True)
|
||||
announcer = merlin or bot
|
||||
await announcer.send("0", "Audition complete.")
|
||||
await bot.send("0", "Audition complete.")
|
||||
|
||||
|
||||
# -- Plugin lifecycle --------------------------------------------------------
|
||||
|
||||
|
||||
async def on_connected(bot) -> None:
|
||||
"""Re-register listener after reconnect."""
|
||||
"""Re-register listener after reconnect; play TTS greeting on first connect."""
|
||||
if not _is_mumble(bot):
|
||||
return
|
||||
ps = _ps(bot)
|
||||
if ps["listen"] or ps["trigger"]:
|
||||
_ensure_listener(bot)
|
||||
_ensure_flush_task(bot)
|
||||
|
||||
# Greet via TTS on first connection only
|
||||
greet = getattr(bot, "config", {}).get("mumble", {}).get("greet")
|
||||
if greet and not ps.get("_greeted"):
|
||||
ps["_greeted"] = True
|
||||
ready = getattr(bot, "_is_audio_ready", None)
|
||||
if ready:
|
||||
for _ in range(20):
|
||||
if ready():
|
||||
break
|
||||
await asyncio.sleep(0.5)
|
||||
bot._spawn(_tts_play(bot, greet), name="voice-greet")
|
||||
|
||||
@@ -253,7 +253,10 @@ class Bot:
|
||||
async def _loop(self) -> None:
|
||||
"""Read and dispatch messages until disconnect."""
|
||||
while self._running:
|
||||
line = await self.conn.readline()
|
||||
try:
|
||||
line = await asyncio.wait_for(self.conn.readline(), timeout=2.0)
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
if line is None:
|
||||
log.warning("server closed connection")
|
||||
return
|
||||
|
||||
@@ -10,6 +10,7 @@ import sys
|
||||
from derp import __version__
|
||||
from derp.bot import Bot
|
||||
from derp.config import build_server_configs, resolve_config
|
||||
from derp.irc import format_msg
|
||||
from derp.log import JsonFormatter
|
||||
from derp.plugin import PluginRegistry
|
||||
|
||||
@@ -37,8 +38,8 @@ def build_parser() -> argparse.ArgumentParser:
|
||||
"--cprofile",
|
||||
metavar="PATH",
|
||||
nargs="?",
|
||||
const="derp.prof",
|
||||
help="enable cProfile; dump stats to PATH [derp.prof]",
|
||||
const="data/derp.prof",
|
||||
help="enable cProfile; dump stats to PATH [data/derp.prof]",
|
||||
)
|
||||
p.add_argument(
|
||||
"--tracemalloc",
|
||||
@@ -72,12 +73,24 @@ def _run(bots: list) -> None:
|
||||
|
||||
|
||||
def _shutdown(bots: list) -> None:
|
||||
"""Signal handler: stop all bot loops so cProfile can flush."""
|
||||
"""Signal handler: stop all bot loops and tear down connections."""
|
||||
logging.getLogger("derp").info("SIGTERM received, shutting down")
|
||||
loop = asyncio.get_running_loop()
|
||||
for bot in bots:
|
||||
bot._running = False
|
||||
if hasattr(bot, "conn"):
|
||||
asyncio.get_running_loop().create_task(bot.conn.close())
|
||||
if hasattr(bot, "conn") and bot.conn.connected:
|
||||
loop.create_task(_quit_and_close(bot))
|
||||
elif hasattr(bot, "_mumble") and bot._mumble:
|
||||
bot._mumble.stop()
|
||||
|
||||
|
||||
async def _quit_and_close(bot) -> None:
|
||||
"""Send IRC QUIT and close the connection."""
|
||||
try:
|
||||
await bot.conn.send(format_msg("QUIT", "shutting down"))
|
||||
except Exception:
|
||||
pass
|
||||
await bot.conn.close()
|
||||
|
||||
|
||||
def _dump_tracemalloc(log: logging.Logger, path: str, limit: int = 25) -> None:
|
||||
@@ -193,10 +206,32 @@ def main(argv: list[str] | None = None) -> int:
|
||||
|
||||
if args.cprofile:
|
||||
import cProfile
|
||||
import threading
|
||||
|
||||
log.info("cProfile enabled, output: %s", args.cprofile)
|
||||
cProfile.runctx("_run(bots)", globals(), {"bots": bots, "_run": _run}, args.cprofile)
|
||||
log.info("profile saved to %s", args.cprofile)
|
||||
prof = cProfile.Profile()
|
||||
prof_path = args.cprofile
|
||||
prof_interval = 10 # dump every 10 seconds
|
||||
prof_stop = threading.Event()
|
||||
|
||||
def _periodic_dump():
|
||||
while not prof_stop.wait(prof_interval):
|
||||
try:
|
||||
prof.dump_stats(prof_path)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
dumper = threading.Thread(target=_periodic_dump, daemon=True)
|
||||
dumper.start()
|
||||
log.info("cProfile enabled, output: %s (saves every %ds)",
|
||||
prof_path, prof_interval)
|
||||
prof.enable()
|
||||
try:
|
||||
_run(bots)
|
||||
finally:
|
||||
prof.disable()
|
||||
prof_stop.set()
|
||||
prof.dump_stats(prof_path)
|
||||
log.info("profile saved to %s", prof_path)
|
||||
else:
|
||||
_run(bots)
|
||||
|
||||
|
||||
@@ -218,11 +218,27 @@ class MumbleBot:
|
||||
self._on_sound_received,
|
||||
)
|
||||
self._mumble.set_receive_sound(self._receive_sound)
|
||||
# Raise retry interval so 2+ bots on the same IP don't trip
|
||||
# the server's autoban (default: 10 attempts / 120s).
|
||||
import pymumble_py3.mumble as _pm
|
||||
if getattr(_pm, "PYMUMBLE_CONNECTION_RETRY_INTERVAL", 0) < 15:
|
||||
_pm.PYMUMBLE_CONNECTION_RETRY_INTERVAL = 15
|
||||
self._mumble.start()
|
||||
self._mumble.is_ready()
|
||||
|
||||
def _on_connected(self) -> None:
|
||||
"""Callback from pymumble thread: connection established."""
|
||||
# Enable TCP keepalive on the control socket to prevent NAT
|
||||
# gateways from dropping the mapping during idle periods.
|
||||
try:
|
||||
import socket as _sock
|
||||
raw = self._mumble.control_socket
|
||||
raw.setsockopt(_sock.SOL_SOCKET, _sock.SO_KEEPALIVE, 1)
|
||||
raw.setsockopt(_sock.IPPROTO_TCP, _sock.TCP_KEEPIDLE, 10)
|
||||
raw.setsockopt(_sock.IPPROTO_TCP, _sock.TCP_KEEPINTVL, 5)
|
||||
raw.setsockopt(_sock.IPPROTO_TCP, _sock.TCP_KEEPCNT, 3)
|
||||
except Exception:
|
||||
pass
|
||||
self._connect_count += 1
|
||||
kind = "reconnected" if self._connect_count > 1 else "connected"
|
||||
session = getattr(self._mumble.users, "myself_session", "?")
|
||||
@@ -238,6 +254,16 @@ class MumbleBot:
|
||||
self._mumble.users.myself.deafen()
|
||||
except Exception:
|
||||
log.exception("mumble: failed to self-deafen on connect")
|
||||
# Self-register on first connect so the server stores the cert
|
||||
# and treats this bot as a known user (persistent identity).
|
||||
if self._connect_count == 1:
|
||||
try:
|
||||
myself = self._mumble.users.myself
|
||||
if not myself.get("user_id"):
|
||||
myself.register()
|
||||
log.info("mumble: self-registered %s", self._username)
|
||||
except Exception:
|
||||
log.debug("mumble: self-register skipped (already registered?)")
|
||||
if self._loop:
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self._notify_plugins_connected(), self._loop,
|
||||
@@ -268,19 +294,7 @@ class MumbleBot:
|
||||
await self._play_greet()
|
||||
|
||||
async def _play_greet(self) -> None:
|
||||
"""Speak the greeting via TTS on connect (voice only, no text)."""
|
||||
greet = self.config.get("mumble", {}).get("greet")
|
||||
if not greet:
|
||||
return
|
||||
voice_mod = self.registry._modules.get("voice")
|
||||
tts_play = getattr(voice_mod, "_tts_play", None) if voice_mod else None
|
||||
if tts_play is None:
|
||||
return
|
||||
for _ in range(20):
|
||||
if self._is_audio_ready():
|
||||
break
|
||||
await asyncio.sleep(0.5)
|
||||
self._spawn(tts_play(self, greet), name="voice-greet")
|
||||
"""No-op: greeting is now handled by the voice plugin's on_connected."""
|
||||
|
||||
def _on_disconnected(self) -> None:
|
||||
"""Callback from pymumble thread: connection lost."""
|
||||
@@ -706,8 +720,8 @@ class MumbleBot:
|
||||
pass
|
||||
|
||||
_get_vol = volume if callable(volume) else lambda: volume
|
||||
log.info("stream_audio: starting pipeline for %s (vol=%.0f%%, seek=%.1fs)",
|
||||
url, _get_vol() * 100, seek)
|
||||
log.info("stream_audio: [%s] starting pipeline for %s (vol=%.0f%%, seek=%.1fs)",
|
||||
self._username, url, _get_vol() * 100, seek)
|
||||
|
||||
def _build_cmd(seek_pos):
|
||||
seek_flag = f" -ss {seek_pos:.3f}" if seek_pos > 0 else ""
|
||||
@@ -803,15 +817,17 @@ class MumbleBot:
|
||||
if not self._is_audio_ready():
|
||||
# Disconnected -- keep reading ffmpeg at real-time pace
|
||||
if _was_feeding:
|
||||
log.warning("stream_audio: connection lost, "
|
||||
"dropping frames at %d", frames)
|
||||
log.warning("stream_audio: [%s] connection lost, "
|
||||
"dropping frames at %d",
|
||||
self._username, frames)
|
||||
_was_feeding = False
|
||||
await asyncio.sleep(0.02)
|
||||
continue
|
||||
|
||||
if not _was_feeding:
|
||||
log.info("stream_audio: connection restored, "
|
||||
"resuming feed at frame %d", frames)
|
||||
log.info("stream_audio: [%s] connection restored, "
|
||||
"resuming feed at frame %d",
|
||||
self._username, frames)
|
||||
_was_feeding = True
|
||||
|
||||
# Seek: fade-out in progress
|
||||
@@ -879,7 +895,8 @@ class MumbleBot:
|
||||
continue
|
||||
|
||||
if frames == 1:
|
||||
log.info("stream_audio: first frame fed to pymumble")
|
||||
log.info("stream_audio: [%s] first frame fed to pymumble",
|
||||
self._username)
|
||||
|
||||
# Keep buffer at most 1 second ahead
|
||||
try:
|
||||
@@ -896,7 +913,8 @@ class MumbleBot:
|
||||
await asyncio.sleep(0.1)
|
||||
except (TypeError, AttributeError):
|
||||
pass
|
||||
log.info("stream_audio: finished, %d frames", frames)
|
||||
log.info("stream_audio: [%s] finished, %d frames",
|
||||
self._username, frames)
|
||||
except asyncio.CancelledError:
|
||||
# Only clear the buffer if volume is still audible -- if a
|
||||
# fade-out has already driven _cur_vol to ~0 the remaining
|
||||
@@ -907,11 +925,12 @@ class MumbleBot:
|
||||
self._mumble.sound_output.clear_buffer()
|
||||
except Exception:
|
||||
pass
|
||||
log.info("stream_audio: cancelled at frame %d (vol=%.3f)",
|
||||
frames, _cur_vol)
|
||||
log.info("stream_audio: [%s] cancelled at frame %d (vol=%.3f)",
|
||||
self._username, frames, _cur_vol)
|
||||
raise
|
||||
except Exception:
|
||||
log.exception("stream_audio: error at frame %d", frames)
|
||||
log.exception("stream_audio: [%s] error at frame %d",
|
||||
self._username, frames)
|
||||
raise
|
||||
finally:
|
||||
try:
|
||||
|
||||
@@ -420,7 +420,7 @@ class TestExtractVideos:
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
with patch("urllib.request.urlopen", return_value=FakeResp()):
|
||||
with patch.object(_mod, "_urlopen", return_value=FakeResp()):
|
||||
results = _search_youtube("test")
|
||||
assert len(results) == 1
|
||||
assert results[0]["id"] == "dup1"
|
||||
@@ -438,7 +438,7 @@ class TestSearchYoutube:
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
with patch("urllib.request.urlopen", return_value=FakeResp()):
|
||||
with patch.object(_mod, "_urlopen", return_value=FakeResp()):
|
||||
results = _search_youtube("test query")
|
||||
assert len(results) == 2
|
||||
assert results[0]["id"] == "abc123"
|
||||
@@ -446,7 +446,7 @@ class TestSearchYoutube:
|
||||
|
||||
def test_http_error_propagates(self):
|
||||
import pytest
|
||||
with patch("urllib.request.urlopen", side_effect=ConnectionError("fail")):
|
||||
with patch.object(_mod, "_urlopen", side_effect=ConnectionError("fail")):
|
||||
with pytest.raises(ConnectionError):
|
||||
_search_youtube("test")
|
||||
|
||||
@@ -1263,7 +1263,7 @@ class TestSearchSearx:
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
with patch("urllib.request.urlopen", return_value=FakeResp()):
|
||||
with patch.object(_mod, "_urlopen", return_value=FakeResp()):
|
||||
results = _search_searx("test query")
|
||||
# Same response served for all categories; deduped by URL
|
||||
assert len(results) == 3
|
||||
@@ -1281,13 +1281,13 @@ class TestSearchSearx:
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
with patch("urllib.request.urlopen", return_value=FakeResp()):
|
||||
with patch.object(_mod, "_urlopen", return_value=FakeResp()):
|
||||
results = _search_searx("nothing")
|
||||
assert results == []
|
||||
|
||||
def test_http_error_returns_empty(self):
|
||||
"""SearXNG catches per-category errors; all failing returns empty."""
|
||||
with patch("urllib.request.urlopen", side_effect=ConnectionError("fail")):
|
||||
with patch.object(_mod, "_urlopen", side_effect=ConnectionError("fail")):
|
||||
results = _search_searx("test")
|
||||
assert results == []
|
||||
|
||||
|
||||
723
tests/test_lastfm.py
Normal file
723
tests/test_lastfm.py
Normal file
@@ -0,0 +1,723 @@
|
||||
"""Tests for the Last.fm music discovery plugin."""
|
||||
|
||||
import asyncio
|
||||
import importlib.util
|
||||
import json
|
||||
import sys
|
||||
from dataclasses import dataclass
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
# -- Load plugin module directly ---------------------------------------------
|
||||
|
||||
_spec = importlib.util.spec_from_file_location("lastfm", "plugins/lastfm.py")
|
||||
_mod = importlib.util.module_from_spec(_spec)
|
||||
sys.modules["lastfm"] = _mod
|
||||
_spec.loader.exec_module(_mod)
|
||||
|
||||
|
||||
# -- Fakes -------------------------------------------------------------------
|
||||
|
||||
|
||||
class _FakeRegistry:
|
||||
def __init__(self):
|
||||
self._modules: dict = {}
|
||||
|
||||
|
||||
class _FakeBot:
|
||||
def __init__(self, *, api_key: str = "test-key"):
|
||||
self.replied: list[str] = []
|
||||
self.config: dict = {"lastfm": {"api_key": api_key}} if api_key else {}
|
||||
self._pstate: dict = {}
|
||||
self.registry = _FakeRegistry()
|
||||
|
||||
async def reply(self, message, text: str) -> None:
|
||||
self.replied.append(text)
|
||||
|
||||
async def long_reply(self, message, lines: list[str], *,
|
||||
label: str = "") -> None:
|
||||
for line in lines:
|
||||
self.replied.append(line)
|
||||
|
||||
|
||||
class _Msg:
|
||||
def __init__(self, text="!similar", nick="Alice", target="0",
|
||||
is_channel=True):
|
||||
self.text = text
|
||||
self.nick = nick
|
||||
self.target = target
|
||||
self.is_channel = is_channel
|
||||
self.prefix = nick
|
||||
self.command = "PRIVMSG"
|
||||
self.params = [target, text]
|
||||
self.tags = {}
|
||||
self.raw = {}
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class _FakeTrack:
|
||||
url: str = ""
|
||||
title: str = ""
|
||||
requester: str = ""
|
||||
|
||||
|
||||
# -- API response fixtures ---------------------------------------------------
|
||||
|
||||
SIMILAR_ARTISTS_RESP = {
|
||||
"similarartists": {
|
||||
"artist": [
|
||||
{"name": "Artist B", "match": "0.85"},
|
||||
{"name": "Artist C", "match": "0.72"},
|
||||
{"name": "Artist D", "match": "0.60"},
|
||||
],
|
||||
},
|
||||
}
|
||||
|
||||
SIMILAR_TRACKS_RESP = {
|
||||
"similartracks": {
|
||||
"track": [
|
||||
{"name": "Track X", "artist": {"name": "Artist B"}, "match": "0.9"},
|
||||
{"name": "Track Y", "artist": {"name": "Artist C"}, "match": "0.7"},
|
||||
],
|
||||
},
|
||||
}
|
||||
|
||||
TOP_TAGS_RESP = {
|
||||
"toptags": {
|
||||
"tag": [
|
||||
{"name": "rock", "count": 100},
|
||||
{"name": "alternative", "count": 80},
|
||||
{"name": "indie", "count": 60},
|
||||
],
|
||||
},
|
||||
}
|
||||
|
||||
TRACK_SEARCH_RESP = {
|
||||
"results": {
|
||||
"trackmatches": {
|
||||
"track": [
|
||||
{"name": "Found Track", "artist": "Found Artist"},
|
||||
{"name": "Another", "artist": "Someone"},
|
||||
],
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestGetApiKey
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGetApiKey:
|
||||
def test_from_config(self):
|
||||
bot = _FakeBot(api_key="cfg-key")
|
||||
assert _mod._get_api_key(bot) == "cfg-key"
|
||||
|
||||
def test_from_env(self):
|
||||
bot = _FakeBot(api_key="")
|
||||
with patch.dict("os.environ", {"LASTFM_API_KEY": "env-key"}):
|
||||
assert _mod._get_api_key(bot) == "env-key"
|
||||
|
||||
def test_env_takes_priority(self):
|
||||
bot = _FakeBot(api_key="cfg-key")
|
||||
with patch.dict("os.environ", {"LASTFM_API_KEY": "env-key"}):
|
||||
assert _mod._get_api_key(bot) == "env-key"
|
||||
|
||||
def test_empty_when_unset(self):
|
||||
bot = _FakeBot(api_key="")
|
||||
with patch.dict("os.environ", {}, clear=True):
|
||||
assert _mod._get_api_key(bot) == ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestApiCall
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestApiCall:
|
||||
def test_parses_json(self):
|
||||
resp = MagicMock()
|
||||
resp.read.return_value = b'{"result": "ok"}'
|
||||
with patch.object(_mod, "urlopen", create=True, return_value=resp):
|
||||
# _api_call imports urlopen from derp.http at call time
|
||||
with patch("derp.http.urlopen", return_value=resp):
|
||||
data = _mod._api_call("key", "artist.getSimilar", artist="X")
|
||||
assert data == {"result": "ok"}
|
||||
|
||||
def test_returns_empty_on_error(self):
|
||||
with patch("derp.http.urlopen", side_effect=ConnectionError("fail")):
|
||||
data = _mod._api_call("key", "artist.getSimilar", artist="X")
|
||||
assert data == {}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestGetSimilarArtists
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGetSimilarArtists:
|
||||
def test_returns_list(self):
|
||||
with patch.object(_mod, "_api_call", return_value=SIMILAR_ARTISTS_RESP):
|
||||
result = _mod._get_similar_artists("key", "Artist A")
|
||||
assert len(result) == 3
|
||||
assert result[0]["name"] == "Artist B"
|
||||
|
||||
def test_single_dict_wrapped(self):
|
||||
"""Single artist result (dict instead of list) gets wrapped."""
|
||||
data = {"similarartists": {"artist": {"name": "Solo", "match": "1.0"}}}
|
||||
with patch.object(_mod, "_api_call", return_value=data):
|
||||
result = _mod._get_similar_artists("key", "X")
|
||||
assert len(result) == 1
|
||||
assert result[0]["name"] == "Solo"
|
||||
|
||||
def test_empty_response(self):
|
||||
with patch.object(_mod, "_api_call", return_value={}):
|
||||
result = _mod._get_similar_artists("key", "X")
|
||||
assert result == []
|
||||
|
||||
def test_missing_key(self):
|
||||
with patch.object(_mod, "_api_call", return_value={"error": 6}):
|
||||
result = _mod._get_similar_artists("key", "X")
|
||||
assert result == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestGetTopTags
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGetTopTags:
|
||||
def test_returns_list(self):
|
||||
with patch.object(_mod, "_api_call", return_value=TOP_TAGS_RESP):
|
||||
result = _mod._get_top_tags("key", "Artist A")
|
||||
assert len(result) == 3
|
||||
assert result[0]["name"] == "rock"
|
||||
|
||||
def test_single_dict_wrapped(self):
|
||||
data = {"toptags": {"tag": {"name": "electronic", "count": 50}}}
|
||||
with patch.object(_mod, "_api_call", return_value=data):
|
||||
result = _mod._get_top_tags("key", "X")
|
||||
assert len(result) == 1
|
||||
|
||||
def test_empty_response(self):
|
||||
with patch.object(_mod, "_api_call", return_value={}):
|
||||
result = _mod._get_top_tags("key", "X")
|
||||
assert result == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestGetSimilarTracks
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGetSimilarTracks:
|
||||
def test_returns_list(self):
|
||||
with patch.object(_mod, "_api_call", return_value=SIMILAR_TRACKS_RESP):
|
||||
result = _mod._get_similar_tracks("key", "A", "T")
|
||||
assert len(result) == 2
|
||||
assert result[0]["name"] == "Track X"
|
||||
|
||||
def test_single_dict_wrapped(self):
|
||||
data = {"similartracks": {"track": {"name": "Solo", "artist": {"name": "X"}}}}
|
||||
with patch.object(_mod, "_api_call", return_value=data):
|
||||
result = _mod._get_similar_tracks("key", "X", "T")
|
||||
assert len(result) == 1
|
||||
|
||||
def test_empty_response(self):
|
||||
with patch.object(_mod, "_api_call", return_value={}):
|
||||
result = _mod._get_similar_tracks("key", "X", "T")
|
||||
assert result == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestSearchTrack
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestSearchTrack:
|
||||
def test_returns_results(self):
|
||||
with patch.object(_mod, "_api_call", return_value=TRACK_SEARCH_RESP):
|
||||
result = _mod._search_track("key", "test")
|
||||
assert len(result) == 2
|
||||
assert result[0]["name"] == "Found Track"
|
||||
|
||||
def test_single_dict_wrapped(self):
|
||||
data = {"results": {"trackmatches": {
|
||||
"track": {"name": "One", "artist": "X"},
|
||||
}}}
|
||||
with patch.object(_mod, "_api_call", return_value=data):
|
||||
result = _mod._search_track("key", "test")
|
||||
assert len(result) == 1
|
||||
|
||||
def test_empty_response(self):
|
||||
with patch.object(_mod, "_api_call", return_value={}):
|
||||
result = _mod._search_track("key", "test")
|
||||
assert result == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestCurrentMeta
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestCurrentMeta:
|
||||
def test_no_music_state(self):
|
||||
bot = _FakeBot()
|
||||
assert _mod._current_meta(bot) == ("", "")
|
||||
|
||||
def test_no_current_track(self):
|
||||
bot = _FakeBot()
|
||||
bot._pstate["music"] = {"current": None}
|
||||
assert _mod._current_meta(bot) == ("", "")
|
||||
|
||||
def test_dash_separator(self):
|
||||
bot = _FakeBot()
|
||||
bot._pstate["music"] = {"current": _FakeTrack(title="Tool - Lateralus")}
|
||||
assert _mod._current_meta(bot) == ("Tool", "Lateralus")
|
||||
|
||||
def test_double_dash_separator(self):
|
||||
bot = _FakeBot()
|
||||
bot._pstate["music"] = {"current": _FakeTrack(title="Tool -- Lateralus")}
|
||||
assert _mod._current_meta(bot) == ("Tool", "Lateralus")
|
||||
|
||||
def test_pipe_separator(self):
|
||||
bot = _FakeBot()
|
||||
bot._pstate["music"] = {"current": _FakeTrack(title="Tool | Lateralus")}
|
||||
assert _mod._current_meta(bot) == ("Tool", "Lateralus")
|
||||
|
||||
def test_tilde_separator(self):
|
||||
bot = _FakeBot()
|
||||
bot._pstate["music"] = {"current": _FakeTrack(title="Tool ~ Lateralus")}
|
||||
assert _mod._current_meta(bot) == ("Tool", "Lateralus")
|
||||
|
||||
def test_no_separator(self):
|
||||
bot = _FakeBot()
|
||||
bot._pstate["music"] = {"current": _FakeTrack(title="Lateralus")}
|
||||
assert _mod._current_meta(bot) == ("", "Lateralus")
|
||||
|
||||
def test_empty_title(self):
|
||||
bot = _FakeBot()
|
||||
bot._pstate["music"] = {"current": _FakeTrack(title="")}
|
||||
assert _mod._current_meta(bot) == ("", "")
|
||||
|
||||
def test_strips_whitespace(self):
|
||||
bot = _FakeBot()
|
||||
bot._pstate["music"] = {
|
||||
"current": _FakeTrack(title=" Tool - Lateralus "),
|
||||
}
|
||||
assert _mod._current_meta(bot) == ("Tool", "Lateralus")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestFmtMatch
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestFmtMatch:
|
||||
def test_float_score(self):
|
||||
assert _mod._fmt_match(0.85) == "85%"
|
||||
|
||||
def test_string_score(self):
|
||||
assert _mod._fmt_match("0.72") == "72%"
|
||||
|
||||
def test_one(self):
|
||||
assert _mod._fmt_match(1.0) == "100%"
|
||||
|
||||
def test_zero(self):
|
||||
assert _mod._fmt_match(0.0) == "0%"
|
||||
|
||||
def test_invalid(self):
|
||||
assert _mod._fmt_match("bad") == ""
|
||||
|
||||
def test_none(self):
|
||||
assert _mod._fmt_match(None) == ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestCmdSimilar
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestCmdSimilar:
|
||||
def test_no_api_key_mb_fallback(self):
|
||||
"""No API key falls back to MusicBrainz for similar results."""
|
||||
bot = _FakeBot(api_key="")
|
||||
msg = _Msg(text="!similar Tool")
|
||||
mb_picks = [{"artist": "MB Artist", "title": "MB Song"}]
|
||||
with patch.dict("os.environ", {}, clear=True), \
|
||||
patch("plugins._musicbrainz.mb_search_artist",
|
||||
return_value="mbid-123"), \
|
||||
patch("plugins._musicbrainz.mb_artist_tags",
|
||||
return_value=["rock", "metal"]), \
|
||||
patch("plugins._musicbrainz.mb_find_similar_recordings",
|
||||
return_value=mb_picks):
|
||||
asyncio.run(_mod.cmd_similar(bot, msg))
|
||||
assert any("Similar to Tool" in r for r in bot.replied)
|
||||
assert any("MB Artist" in r for r in bot.replied)
|
||||
|
||||
def test_no_api_key_mb_no_results(self):
|
||||
"""No API key + MusicBrainz returns nothing shows 'no similar'."""
|
||||
bot = _FakeBot(api_key="")
|
||||
msg = _Msg(text="!similar Tool")
|
||||
with patch.dict("os.environ", {}, clear=True), \
|
||||
patch("plugins._musicbrainz.mb_search_artist",
|
||||
return_value=None):
|
||||
asyncio.run(_mod.cmd_similar(bot, msg))
|
||||
assert any("No similar artists" in r for r in bot.replied)
|
||||
|
||||
def test_no_api_key_play_mode(self):
|
||||
"""No API key + play mode delegates to cmd_play via MB results."""
|
||||
bot = _FakeBot(api_key="")
|
||||
msg = _Msg(text="!similar play Tool")
|
||||
mb_picks = [{"artist": "MB Band", "title": "MB Track"}]
|
||||
play_called = []
|
||||
|
||||
async def fake_play(b, m):
|
||||
play_called.append(m.text)
|
||||
|
||||
music_mod = MagicMock()
|
||||
music_mod.cmd_play = fake_play
|
||||
bot.registry._modules["music"] = music_mod
|
||||
|
||||
with patch.dict("os.environ", {}, clear=True), \
|
||||
patch("plugins._musicbrainz.mb_search_artist",
|
||||
return_value="mbid-123"), \
|
||||
patch("plugins._musicbrainz.mb_artist_tags",
|
||||
return_value=["rock"]), \
|
||||
patch("plugins._musicbrainz.mb_find_similar_recordings",
|
||||
return_value=mb_picks):
|
||||
asyncio.run(_mod.cmd_similar(bot, msg))
|
||||
assert len(play_called) == 1
|
||||
assert "MB Band" in play_called[0]
|
||||
|
||||
def test_no_artist_nothing_playing(self):
|
||||
bot = _FakeBot()
|
||||
msg = _Msg(text="!similar")
|
||||
asyncio.run(_mod.cmd_similar(bot, msg))
|
||||
assert any("Nothing playing" in r for r in bot.replied)
|
||||
|
||||
def test_artist_query_shows_similar(self):
|
||||
bot = _FakeBot()
|
||||
msg = _Msg(text="!similar Tool")
|
||||
with patch.object(_mod, "_get_similar_tracks", return_value=[]):
|
||||
with patch.object(_mod, "_get_similar_artists",
|
||||
return_value=SIMILAR_ARTISTS_RESP["similarartists"]["artist"]):
|
||||
asyncio.run(_mod.cmd_similar(bot, msg))
|
||||
assert any("Similar to Tool" in r for r in bot.replied)
|
||||
assert any("Artist B" in r for r in bot.replied)
|
||||
|
||||
def test_track_level_similarity(self):
|
||||
"""When current track has artist + title, tries track similarity first."""
|
||||
bot = _FakeBot()
|
||||
bot._pstate["music"] = {
|
||||
"current": _FakeTrack(title="Tool - Lateralus"),
|
||||
}
|
||||
msg = _Msg(text="!similar")
|
||||
tracks = SIMILAR_TRACKS_RESP["similartracks"]["track"]
|
||||
with patch.object(_mod, "_get_similar_tracks", return_value=tracks):
|
||||
asyncio.run(_mod.cmd_similar(bot, msg))
|
||||
assert any("Similar to Tool - Lateralus" in r for r in bot.replied)
|
||||
assert any("Track X" in r for r in bot.replied)
|
||||
|
||||
def test_falls_back_to_artist(self):
|
||||
"""Falls back to artist similarity when no track results."""
|
||||
bot = _FakeBot()
|
||||
bot._pstate["music"] = {
|
||||
"current": _FakeTrack(title="Tool - Lateralus"),
|
||||
}
|
||||
msg = _Msg(text="!similar")
|
||||
artists = SIMILAR_ARTISTS_RESP["similarartists"]["artist"]
|
||||
with patch.object(_mod, "_get_similar_tracks", return_value=[]):
|
||||
with patch.object(_mod, "_get_similar_artists", return_value=artists):
|
||||
asyncio.run(_mod.cmd_similar(bot, msg))
|
||||
assert any("Similar to Tool" in r for r in bot.replied)
|
||||
|
||||
def test_no_similar_found(self):
|
||||
bot = _FakeBot()
|
||||
msg = _Msg(text="!similar Obscure Band")
|
||||
with patch.object(_mod, "_get_similar_tracks", return_value=[]):
|
||||
with patch.object(_mod, "_get_similar_artists", return_value=[]):
|
||||
asyncio.run(_mod.cmd_similar(bot, msg))
|
||||
assert any("No similar artists" in r for r in bot.replied)
|
||||
|
||||
def test_play_mode_artist(self):
|
||||
"""!similar play delegates to music cmd_play."""
|
||||
bot = _FakeBot()
|
||||
msg = _Msg(text="!similar play Tool")
|
||||
artists = [{"name": "Deftones", "match": "0.8"}]
|
||||
play_called = []
|
||||
|
||||
async def fake_play(b, m):
|
||||
play_called.append(m.text)
|
||||
|
||||
music_mod = MagicMock()
|
||||
music_mod.cmd_play = fake_play
|
||||
bot.registry._modules["music"] = music_mod
|
||||
|
||||
with patch.object(_mod, "_get_similar_tracks", return_value=[]):
|
||||
with patch.object(_mod, "_get_similar_artists", return_value=artists):
|
||||
asyncio.run(_mod.cmd_similar(bot, msg))
|
||||
assert len(play_called) == 1
|
||||
assert "Deftones" in play_called[0]
|
||||
|
||||
def test_play_mode_track(self):
|
||||
"""!similar play with track-level results delegates to cmd_play."""
|
||||
bot = _FakeBot()
|
||||
bot._pstate["music"] = {
|
||||
"current": _FakeTrack(title="Tool - Lateralus"),
|
||||
}
|
||||
msg = _Msg(text="!similar play")
|
||||
tracks = [{"name": "Schism", "artist": {"name": "Tool"}, "match": "0.9"}]
|
||||
play_called = []
|
||||
|
||||
async def fake_play(b, m):
|
||||
play_called.append(m.text)
|
||||
|
||||
music_mod = MagicMock()
|
||||
music_mod.cmd_play = fake_play
|
||||
bot.registry._modules["music"] = music_mod
|
||||
|
||||
with patch.object(_mod, "_get_similar_tracks", return_value=tracks):
|
||||
asyncio.run(_mod.cmd_similar(bot, msg))
|
||||
assert len(play_called) == 1
|
||||
assert "Tool" in play_called[0]
|
||||
assert "Schism" in play_called[0]
|
||||
|
||||
def test_match_score_displayed(self):
|
||||
bot = _FakeBot()
|
||||
msg = _Msg(text="!similar Tool")
|
||||
artists = [{"name": "Deftones", "match": "0.85"}]
|
||||
with patch.object(_mod, "_get_similar_tracks", return_value=[]):
|
||||
with patch.object(_mod, "_get_similar_artists", return_value=artists):
|
||||
asyncio.run(_mod.cmd_similar(bot, msg))
|
||||
assert any("85%" in r for r in bot.replied)
|
||||
|
||||
def test_current_track_no_separator(self):
|
||||
"""Title without separator uses whole title as search artist."""
|
||||
bot = _FakeBot()
|
||||
bot._pstate["music"] = {
|
||||
"current": _FakeTrack(title="Lateralus"),
|
||||
}
|
||||
msg = _Msg(text="!similar")
|
||||
artists = [{"name": "APC", "match": "0.7"}]
|
||||
with patch.object(_mod, "_get_similar_tracks", return_value=[]):
|
||||
with patch.object(_mod, "_get_similar_artists", return_value=artists):
|
||||
asyncio.run(_mod.cmd_similar(bot, msg))
|
||||
assert any("Similar to Lateralus" in r for r in bot.replied)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestCmdTags
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestCmdTags:
|
||||
def test_no_api_key_mb_fallback(self):
|
||||
"""No API key falls back to MusicBrainz for tags."""
|
||||
bot = _FakeBot(api_key="")
|
||||
msg = _Msg(text="!tags Tool")
|
||||
with patch.dict("os.environ", {}, clear=True), \
|
||||
patch("plugins._musicbrainz.mb_search_artist",
|
||||
return_value="mbid-123"), \
|
||||
patch("plugins._musicbrainz.mb_artist_tags",
|
||||
return_value=["rock", "progressive metal", "art rock"]):
|
||||
asyncio.run(_mod.cmd_tags(bot, msg))
|
||||
assert any("Tool:" in r for r in bot.replied)
|
||||
assert any("rock" in r for r in bot.replied)
|
||||
assert any("progressive metal" in r for r in bot.replied)
|
||||
|
||||
def test_no_api_key_mb_no_results(self):
|
||||
"""No API key + MusicBrainz returns nothing shows 'no tags'."""
|
||||
bot = _FakeBot(api_key="")
|
||||
msg = _Msg(text="!tags Obscure")
|
||||
with patch.dict("os.environ", {}, clear=True), \
|
||||
patch("plugins._musicbrainz.mb_search_artist",
|
||||
return_value=None):
|
||||
asyncio.run(_mod.cmd_tags(bot, msg))
|
||||
assert any("No tags found" in r for r in bot.replied)
|
||||
|
||||
def test_no_artist_nothing_playing(self):
|
||||
bot = _FakeBot()
|
||||
msg = _Msg(text="!tags")
|
||||
asyncio.run(_mod.cmd_tags(bot, msg))
|
||||
assert any("Nothing playing" in r for r in bot.replied)
|
||||
|
||||
def test_shows_tags(self):
|
||||
bot = _FakeBot()
|
||||
msg = _Msg(text="!tags Tool")
|
||||
tags = TOP_TAGS_RESP["toptags"]["tag"]
|
||||
with patch.object(_mod, "_get_top_tags", return_value=tags):
|
||||
asyncio.run(_mod.cmd_tags(bot, msg))
|
||||
assert any("rock" in r for r in bot.replied)
|
||||
assert any("alternative" in r for r in bot.replied)
|
||||
assert any("Tool:" in r for r in bot.replied)
|
||||
|
||||
def test_no_tags_found(self):
|
||||
bot = _FakeBot()
|
||||
msg = _Msg(text="!tags Obscure")
|
||||
with patch.object(_mod, "_get_top_tags", return_value=[]):
|
||||
asyncio.run(_mod.cmd_tags(bot, msg))
|
||||
assert any("No tags found" in r for r in bot.replied)
|
||||
|
||||
def test_from_current_track(self):
|
||||
bot = _FakeBot()
|
||||
bot._pstate["music"] = {
|
||||
"current": _FakeTrack(title="Tool - Lateralus"),
|
||||
}
|
||||
msg = _Msg(text="!tags")
|
||||
tags = [{"name": "prog metal", "count": 100}]
|
||||
with patch.object(_mod, "_get_top_tags", return_value=tags):
|
||||
asyncio.run(_mod.cmd_tags(bot, msg))
|
||||
assert any("Tool:" in r for r in bot.replied)
|
||||
assert any("prog metal" in r for r in bot.replied)
|
||||
|
||||
def test_from_current_no_separator(self):
|
||||
"""Uses full title as artist when no separator."""
|
||||
bot = _FakeBot()
|
||||
bot._pstate["music"] = {
|
||||
"current": _FakeTrack(title="Lateralus"),
|
||||
}
|
||||
msg = _Msg(text="!tags")
|
||||
tags = [{"name": "rock", "count": 50}]
|
||||
with patch.object(_mod, "_get_top_tags", return_value=tags):
|
||||
asyncio.run(_mod.cmd_tags(bot, msg))
|
||||
assert any("Lateralus:" in r for r in bot.replied)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestParseTitle
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestParseTitle:
|
||||
def test_dash_separator(self):
|
||||
assert _mod._parse_title("Tool - Lateralus") == ("Tool", "Lateralus")
|
||||
|
||||
def test_double_dash(self):
|
||||
assert _mod._parse_title("Tool -- Lateralus") == ("Tool", "Lateralus")
|
||||
|
||||
def test_pipe_separator(self):
|
||||
assert _mod._parse_title("Tool | Lateralus") == ("Tool", "Lateralus")
|
||||
|
||||
def test_tilde_separator(self):
|
||||
assert _mod._parse_title("Tool ~ Lateralus") == ("Tool", "Lateralus")
|
||||
|
||||
def test_no_separator(self):
|
||||
assert _mod._parse_title("Lateralus") == ("", "Lateralus")
|
||||
|
||||
def test_empty_string(self):
|
||||
assert _mod._parse_title("") == ("", "")
|
||||
|
||||
def test_strips_whitespace(self):
|
||||
assert _mod._parse_title(" Tool - Lateralus ") == ("Tool", "Lateralus")
|
||||
|
||||
def test_first_separator_wins(self):
|
||||
"""Only the first matching separator is used."""
|
||||
assert _mod._parse_title("A - B - C") == ("A", "B - C")
|
||||
|
||||
def test_dash_priority_over_pipe(self):
|
||||
"""Dash separator is tried before pipe."""
|
||||
assert _mod._parse_title("A - B | C") == ("A", "B | C")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestDiscoverSimilar
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestDiscoverSimilar:
|
||||
def test_lastfm_path(self):
|
||||
"""Returns Last.fm result when API key + results available."""
|
||||
bot = _FakeBot(api_key="test-key")
|
||||
tracks = [{"name": "Found", "artist": {"name": "Band"}, "match": "0.9"}]
|
||||
with patch.object(_mod, "_get_similar_tracks", return_value=tracks):
|
||||
result = asyncio.run(
|
||||
_mod.discover_similar(bot, "Tool - Lateralus"),
|
||||
)
|
||||
assert result == ("Band", "Found")
|
||||
|
||||
def test_lastfm_empty_falls_to_musicbrainz(self):
|
||||
"""Falls back to MusicBrainz when Last.fm returns nothing."""
|
||||
bot = _FakeBot(api_key="test-key")
|
||||
mb_picks = [{"artist": "MB Artist", "title": "MB Song"}]
|
||||
with patch.object(_mod, "_get_similar_tracks", return_value=[]), \
|
||||
patch("plugins._musicbrainz.mb_search_artist", return_value="mbid"), \
|
||||
patch("plugins._musicbrainz.mb_artist_tags", return_value=["rock"]), \
|
||||
patch("plugins._musicbrainz.mb_find_similar_recordings",
|
||||
return_value=mb_picks):
|
||||
result = asyncio.run(
|
||||
_mod.discover_similar(bot, "Tool - Lateralus"),
|
||||
)
|
||||
assert result == ("MB Artist", "MB Song")
|
||||
|
||||
def test_no_api_key_uses_musicbrainz(self):
|
||||
"""Skips Last.fm when no API key, goes straight to MusicBrainz."""
|
||||
bot = _FakeBot(api_key="")
|
||||
mb_picks = [{"artist": "MB Band", "title": "MB Track"}]
|
||||
with patch.dict("os.environ", {}, clear=True), \
|
||||
patch("plugins._musicbrainz.mb_search_artist", return_value="mbid"), \
|
||||
patch("plugins._musicbrainz.mb_artist_tags", return_value=["rock"]), \
|
||||
patch("plugins._musicbrainz.mb_find_similar_recordings",
|
||||
return_value=mb_picks):
|
||||
result = asyncio.run(
|
||||
_mod.discover_similar(bot, "Tool - Lateralus"),
|
||||
)
|
||||
assert result == ("MB Band", "MB Track")
|
||||
|
||||
def test_both_fail_returns_none(self):
|
||||
"""Returns None when both Last.fm and MusicBrainz fail."""
|
||||
bot = _FakeBot(api_key="test-key")
|
||||
with patch.object(_mod, "_get_similar_tracks", return_value=[]), \
|
||||
patch("plugins._musicbrainz.mb_search_artist", return_value=None):
|
||||
result = asyncio.run(
|
||||
_mod.discover_similar(bot, "Tool - Lateralus"),
|
||||
)
|
||||
assert result is None
|
||||
|
||||
def test_no_artist_returns_none(self):
|
||||
"""Returns None when title has no artist component."""
|
||||
bot = _FakeBot(api_key="test-key")
|
||||
result = asyncio.run(
|
||||
_mod.discover_similar(bot, "Lateralus"),
|
||||
)
|
||||
assert result is None
|
||||
|
||||
def test_musicbrainz_import_error_handled(self):
|
||||
"""Gracefully handles import error for _musicbrainz module."""
|
||||
bot = _FakeBot(api_key="test-key")
|
||||
with patch.object(_mod, "_get_similar_tracks", return_value=[]), \
|
||||
patch.dict("sys.modules", {"plugins._musicbrainz": None}):
|
||||
result = asyncio.run(
|
||||
_mod.discover_similar(bot, "Tool - Lateralus"),
|
||||
)
|
||||
assert result is None
|
||||
|
||||
def test_lastfm_exception_falls_to_musicbrainz(self):
|
||||
"""Last.fm exception triggers MusicBrainz fallback."""
|
||||
bot = _FakeBot(api_key="test-key")
|
||||
mb_picks = [{"artist": "Fallback", "title": "Song"}]
|
||||
with patch.object(_mod, "_get_similar_tracks",
|
||||
side_effect=Exception("API down")), \
|
||||
patch("plugins._musicbrainz.mb_search_artist", return_value="mbid"), \
|
||||
patch("plugins._musicbrainz.mb_artist_tags", return_value=["rock"]), \
|
||||
patch("plugins._musicbrainz.mb_find_similar_recordings",
|
||||
return_value=mb_picks):
|
||||
result = asyncio.run(
|
||||
_mod.discover_similar(bot, "Tool - Lateralus"),
|
||||
)
|
||||
assert result == ("Fallback", "Song")
|
||||
|
||||
def test_lastfm_pick_missing_name_falls_to_musicbrainz(self):
|
||||
"""Falls to MB when Last.fm result has empty artist/title."""
|
||||
bot = _FakeBot(api_key="test-key")
|
||||
tracks = [{"name": "", "artist": {"name": ""}, "match": "0.9"}]
|
||||
mb_picks = [{"artist": "MB", "title": "Track"}]
|
||||
with patch.object(_mod, "_get_similar_tracks", return_value=tracks), \
|
||||
patch("plugins._musicbrainz.mb_search_artist", return_value="mbid"), \
|
||||
patch("plugins._musicbrainz.mb_artist_tags", return_value=["rock"]), \
|
||||
patch("plugins._musicbrainz.mb_find_similar_recordings",
|
||||
return_value=mb_picks):
|
||||
result = asyncio.run(
|
||||
_mod.discover_similar(bot, "Tool - Lateralus"),
|
||||
)
|
||||
assert result == ("MB", "Track")
|
||||
498
tests/test_mumble_admin.py
Normal file
498
tests/test_mumble_admin.py
Normal file
@@ -0,0 +1,498 @@
|
||||
"""Tests for the mumble_admin plugin."""
|
||||
|
||||
import asyncio
|
||||
import importlib.util
|
||||
from dataclasses import dataclass, field
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
# -- Load plugin module directly ---------------------------------------------
|
||||
|
||||
_spec = importlib.util.spec_from_file_location(
|
||||
"mumble_admin", "plugins/mumble_admin.py",
|
||||
)
|
||||
_mod = importlib.util.module_from_spec(_spec)
|
||||
_spec.loader.exec_module(_mod)
|
||||
|
||||
cmd_mu = _mod.cmd_mu
|
||||
_find_user = _mod._find_user
|
||||
_find_channel = _mod._find_channel
|
||||
_channel_name = _mod._channel_name
|
||||
|
||||
|
||||
# -- Fakes -------------------------------------------------------------------
|
||||
|
||||
|
||||
@dataclass
|
||||
class _FakeMessage:
|
||||
text: str = ""
|
||||
nick: str = "admin"
|
||||
prefix: str = "admin"
|
||||
target: str = "0"
|
||||
is_channel: bool = True
|
||||
params: list[str] = field(default_factory=list)
|
||||
|
||||
|
||||
class _FakeRegistry:
|
||||
_bots: dict = field(default_factory=dict)
|
||||
|
||||
def __init__(self):
|
||||
self._bots = {}
|
||||
|
||||
|
||||
class _FakeBot:
|
||||
def __init__(self, users=None, channels=None):
|
||||
self.registry = _FakeRegistry()
|
||||
self._mumble = MagicMock()
|
||||
if users is not None:
|
||||
self._mumble.users = users
|
||||
else:
|
||||
self._mumble.users = {}
|
||||
if channels is not None:
|
||||
self._mumble.channels = channels
|
||||
self._replies: list[str] = []
|
||||
|
||||
async def reply(self, message, text):
|
||||
self._replies.append(text)
|
||||
|
||||
async def send(self, target, text):
|
||||
self._replies.append(text)
|
||||
|
||||
|
||||
def _make_user(name, channel_id=0, mute=False, deaf=False,
|
||||
self_mute=False, self_deaf=False):
|
||||
"""Create a fake pymumble user (dict with methods)."""
|
||||
u = MagicMock()
|
||||
u.__getitem__ = lambda s, k: {
|
||||
"name": name,
|
||||
"channel_id": channel_id,
|
||||
"mute": mute,
|
||||
"deaf": deaf,
|
||||
"self_mute": self_mute,
|
||||
"self_deaf": self_deaf,
|
||||
}[k]
|
||||
u.get = lambda k, d=None: {
|
||||
"name": name,
|
||||
"channel_id": channel_id,
|
||||
"mute": mute,
|
||||
"deaf": deaf,
|
||||
"self_mute": self_mute,
|
||||
"self_deaf": self_deaf,
|
||||
}.get(k, d)
|
||||
return u
|
||||
|
||||
|
||||
def _make_channel(name, channel_id=0, parent=0):
|
||||
"""Create a fake pymumble channel (dict with methods)."""
|
||||
c = MagicMock()
|
||||
c.__getitem__ = lambda s, k: {
|
||||
"name": name,
|
||||
"channel_id": channel_id,
|
||||
"parent": parent,
|
||||
}[k]
|
||||
c.get = lambda k, d=None: {
|
||||
"name": name,
|
||||
"channel_id": channel_id,
|
||||
"parent": parent,
|
||||
}.get(k, d)
|
||||
return c
|
||||
|
||||
|
||||
# -- TestFindUser ------------------------------------------------------------
|
||||
|
||||
|
||||
class TestFindUser:
|
||||
def test_case_insensitive(self):
|
||||
alice = _make_user("Alice")
|
||||
bot = _FakeBot(users={1: alice})
|
||||
assert _find_user(bot, "alice") is alice
|
||||
assert _find_user(bot, "ALICE") is alice
|
||||
assert _find_user(bot, "Alice") is alice
|
||||
|
||||
def test_not_found(self):
|
||||
bot = _FakeBot(users={1: _make_user("Alice")})
|
||||
assert _find_user(bot, "Bob") is None
|
||||
|
||||
def test_no_mumble(self):
|
||||
bot = _FakeBot()
|
||||
bot._mumble = None
|
||||
assert _find_user(bot, "anyone") is None
|
||||
|
||||
|
||||
# -- TestFindChannel ---------------------------------------------------------
|
||||
|
||||
|
||||
class TestFindChannel:
|
||||
def test_case_insensitive(self):
|
||||
lobby = _make_channel("Lobby", channel_id=0)
|
||||
bot = _FakeBot(channels={0: lobby})
|
||||
assert _find_channel(bot, "lobby") is lobby
|
||||
assert _find_channel(bot, "LOBBY") is lobby
|
||||
|
||||
def test_not_found(self):
|
||||
bot = _FakeBot(channels={0: _make_channel("Lobby")})
|
||||
assert _find_channel(bot, "AFK") is None
|
||||
|
||||
def test_no_mumble(self):
|
||||
bot = _FakeBot()
|
||||
bot._mumble = None
|
||||
assert _find_channel(bot, "any") is None
|
||||
|
||||
|
||||
# -- TestChannelName ---------------------------------------------------------
|
||||
|
||||
|
||||
class TestChannelName:
|
||||
def test_resolves(self):
|
||||
lobby = _make_channel("Lobby", channel_id=0)
|
||||
bot = _FakeBot(channels={0: lobby})
|
||||
assert _channel_name(bot, 0) == "Lobby"
|
||||
|
||||
def test_missing_returns_id(self):
|
||||
bot = _FakeBot(channels={})
|
||||
assert _channel_name(bot, 42) == "42"
|
||||
|
||||
def test_no_mumble(self):
|
||||
bot = _FakeBot()
|
||||
bot._mumble = None
|
||||
assert _channel_name(bot, 5) == "5"
|
||||
|
||||
|
||||
# -- TestDispatch ------------------------------------------------------------
|
||||
|
||||
|
||||
class TestDispatch:
|
||||
def test_no_args_shows_usage(self):
|
||||
bot = _FakeBot()
|
||||
msg = _FakeMessage(text="!mu")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
assert len(bot._replies) == 1
|
||||
assert "Usage" in bot._replies[0]
|
||||
|
||||
def test_unknown_sub_shows_usage(self):
|
||||
bot = _FakeBot()
|
||||
msg = _FakeMessage(text="!mu bogus")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
assert "Usage" in bot._replies[0]
|
||||
|
||||
def test_valid_sub_routes(self):
|
||||
alice = _make_user("Alice")
|
||||
bot = _FakeBot(users={1: alice})
|
||||
msg = _FakeMessage(text="!mu kick Alice")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
alice.kick.assert_called_once_with("")
|
||||
assert "Kicked" in bot._replies[0]
|
||||
|
||||
|
||||
# -- TestKick ----------------------------------------------------------------
|
||||
|
||||
|
||||
class TestKick:
|
||||
def test_kick_user(self):
|
||||
alice = _make_user("Alice")
|
||||
bot = _FakeBot(users={1: alice})
|
||||
msg = _FakeMessage(text="!mu kick Alice")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
alice.kick.assert_called_once_with("")
|
||||
assert "Kicked Alice" in bot._replies[0]
|
||||
|
||||
def test_kick_with_reason(self):
|
||||
alice = _make_user("Alice")
|
||||
bot = _FakeBot(users={1: alice})
|
||||
msg = _FakeMessage(text="!mu kick Alice being rude")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
alice.kick.assert_called_once_with("being rude")
|
||||
|
||||
def test_kick_user_not_found(self):
|
||||
bot = _FakeBot(users={})
|
||||
msg = _FakeMessage(text="!mu kick Ghost")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
assert "not found" in bot._replies[0].lower()
|
||||
|
||||
def test_kick_no_args(self):
|
||||
bot = _FakeBot()
|
||||
msg = _FakeMessage(text="!mu kick")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
assert "Usage" in bot._replies[0]
|
||||
|
||||
|
||||
# -- TestBan -----------------------------------------------------------------
|
||||
|
||||
|
||||
class TestBan:
|
||||
def test_ban_user(self):
|
||||
bob = _make_user("Bob")
|
||||
bot = _FakeBot(users={1: bob})
|
||||
msg = _FakeMessage(text="!mu ban Bob")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
bob.ban.assert_called_once_with("")
|
||||
assert "Banned Bob" in bot._replies[0]
|
||||
|
||||
def test_ban_with_reason(self):
|
||||
bob = _make_user("Bob")
|
||||
bot = _FakeBot(users={1: bob})
|
||||
msg = _FakeMessage(text="!mu ban Bob spamming")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
bob.ban.assert_called_once_with("spamming")
|
||||
|
||||
def test_ban_user_not_found(self):
|
||||
bot = _FakeBot(users={})
|
||||
msg = _FakeMessage(text="!mu ban Ghost")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
assert "not found" in bot._replies[0].lower()
|
||||
|
||||
|
||||
# -- TestMuteUnmute ----------------------------------------------------------
|
||||
|
||||
|
||||
class TestMuteUnmute:
|
||||
def test_mute(self):
|
||||
alice = _make_user("Alice")
|
||||
bot = _FakeBot(users={1: alice})
|
||||
msg = _FakeMessage(text="!mu mute Alice")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
alice.mute.assert_called_once()
|
||||
assert "Muted" in bot._replies[0]
|
||||
|
||||
def test_unmute(self):
|
||||
alice = _make_user("Alice", mute=True)
|
||||
bot = _FakeBot(users={1: alice})
|
||||
msg = _FakeMessage(text="!mu unmute Alice")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
alice.unmute.assert_called_once()
|
||||
assert "Unmuted" in bot._replies[0]
|
||||
|
||||
def test_mute_not_found(self):
|
||||
bot = _FakeBot(users={})
|
||||
msg = _FakeMessage(text="!mu mute Nobody")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
assert "not found" in bot._replies[0].lower()
|
||||
|
||||
def test_unmute_not_found(self):
|
||||
bot = _FakeBot(users={})
|
||||
msg = _FakeMessage(text="!mu unmute Nobody")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
assert "not found" in bot._replies[0].lower()
|
||||
|
||||
|
||||
# -- TestDeafenUndeafen ------------------------------------------------------
|
||||
|
||||
|
||||
class TestDeafenUndeafen:
|
||||
def test_deafen(self):
|
||||
alice = _make_user("Alice")
|
||||
bot = _FakeBot(users={1: alice})
|
||||
msg = _FakeMessage(text="!mu deafen Alice")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
alice.deafen.assert_called_once()
|
||||
assert "Deafened" in bot._replies[0]
|
||||
|
||||
def test_undeafen(self):
|
||||
alice = _make_user("Alice", deaf=True)
|
||||
bot = _FakeBot(users={1: alice})
|
||||
msg = _FakeMessage(text="!mu undeafen Alice")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
alice.undeafen.assert_called_once()
|
||||
assert "Undeafened" in bot._replies[0]
|
||||
|
||||
def test_deafen_not_found(self):
|
||||
bot = _FakeBot(users={})
|
||||
msg = _FakeMessage(text="!mu deafen Nobody")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
assert "not found" in bot._replies[0].lower()
|
||||
|
||||
def test_undeafen_not_found(self):
|
||||
bot = _FakeBot(users={})
|
||||
msg = _FakeMessage(text="!mu undeafen Nobody")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
assert "not found" in bot._replies[0].lower()
|
||||
|
||||
|
||||
# -- TestMove ----------------------------------------------------------------
|
||||
|
||||
|
||||
class TestMove:
|
||||
def test_move_user(self):
|
||||
alice = _make_user("Alice", channel_id=0)
|
||||
afk = _make_channel("AFK", channel_id=5)
|
||||
bot = _FakeBot(users={1: alice}, channels={0: _make_channel("Root"), 5: afk})
|
||||
msg = _FakeMessage(text="!mu move Alice AFK")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
alice.move_in.assert_called_once_with(5)
|
||||
assert "Moved Alice to AFK" in bot._replies[0]
|
||||
|
||||
def test_move_user_not_found(self):
|
||||
bot = _FakeBot(users={}, channels={5: _make_channel("AFK", channel_id=5)})
|
||||
msg = _FakeMessage(text="!mu move Ghost AFK")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
assert "user not found" in bot._replies[0].lower()
|
||||
|
||||
def test_move_channel_not_found(self):
|
||||
alice = _make_user("Alice")
|
||||
bot = _FakeBot(users={1: alice}, channels={})
|
||||
msg = _FakeMessage(text="!mu move Alice Nowhere")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
assert "channel not found" in bot._replies[0].lower()
|
||||
|
||||
def test_move_missing_args(self):
|
||||
bot = _FakeBot()
|
||||
msg = _FakeMessage(text="!mu move Alice")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
assert "Usage" in bot._replies[0]
|
||||
|
||||
|
||||
# -- TestUsers ---------------------------------------------------------------
|
||||
|
||||
|
||||
class TestUsers:
|
||||
def test_list_users(self):
|
||||
alice = _make_user("Alice", channel_id=0)
|
||||
bob = _make_user("Bob", channel_id=0, self_mute=True)
|
||||
lobby = _make_channel("Lobby", channel_id=0)
|
||||
bot = _FakeBot(users={1: alice, 2: bob}, channels={0: lobby})
|
||||
msg = _FakeMessage(text="!mu users")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
reply = bot._replies[0]
|
||||
assert "2 user(s)" in reply
|
||||
assert "Alice" in reply
|
||||
assert "Bob" in reply
|
||||
assert "muted" in reply
|
||||
|
||||
def test_list_with_bots(self):
|
||||
alice = _make_user("Alice", channel_id=0)
|
||||
derp = _make_user("derp", channel_id=0)
|
||||
lobby = _make_channel("Lobby", channel_id=0)
|
||||
bot = _FakeBot(users={1: alice, 2: derp}, channels={0: lobby})
|
||||
bot.registry._bots = {"derp": MagicMock()}
|
||||
msg = _FakeMessage(text="!mu users")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
reply = bot._replies[0]
|
||||
assert "bot" in reply
|
||||
assert "2 user(s)" in reply
|
||||
|
||||
def test_deaf_flag(self):
|
||||
alice = _make_user("Alice", channel_id=0, self_deaf=True)
|
||||
lobby = _make_channel("Lobby", channel_id=0)
|
||||
bot = _FakeBot(users={1: alice}, channels={0: lobby})
|
||||
msg = _FakeMessage(text="!mu users")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
assert "deaf" in bot._replies[0]
|
||||
|
||||
|
||||
# -- TestChannels ------------------------------------------------------------
|
||||
|
||||
|
||||
class TestChannels:
|
||||
def test_list_channels(self):
|
||||
lobby = _make_channel("Lobby", channel_id=0)
|
||||
afk = _make_channel("AFK", channel_id=1)
|
||||
alice = _make_user("Alice", channel_id=0)
|
||||
bot = _FakeBot(users={1: alice}, channels={0: lobby, 1: afk})
|
||||
msg = _FakeMessage(text="!mu channels")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
reply = bot._replies[0]
|
||||
assert "Lobby (1)" in reply
|
||||
assert "AFK (0)" in reply
|
||||
|
||||
|
||||
# -- TestMkchan --------------------------------------------------------------
|
||||
|
||||
|
||||
class TestMkchan:
|
||||
def test_create_channel(self):
|
||||
bot = _FakeBot()
|
||||
msg = _FakeMessage(text="!mu mkchan Gaming")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
bot._mumble.channels.new_channel.assert_called_once_with(
|
||||
0, "Gaming", temporary=False,
|
||||
)
|
||||
assert "Created" in bot._replies[0]
|
||||
|
||||
def test_create_temp_channel(self):
|
||||
bot = _FakeBot()
|
||||
msg = _FakeMessage(text="!mu mkchan Gaming temp")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
bot._mumble.channels.new_channel.assert_called_once_with(
|
||||
0, "Gaming", temporary=True,
|
||||
)
|
||||
assert "temporary" in bot._replies[0]
|
||||
|
||||
def test_missing_name(self):
|
||||
bot = _FakeBot()
|
||||
msg = _FakeMessage(text="!mu mkchan")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
assert "Usage" in bot._replies[0]
|
||||
|
||||
|
||||
# -- TestRmchan --------------------------------------------------------------
|
||||
|
||||
|
||||
class TestRmchan:
|
||||
def test_remove_channel(self):
|
||||
afk = _make_channel("AFK", channel_id=5)
|
||||
bot = _FakeBot(channels={5: afk})
|
||||
msg = _FakeMessage(text="!mu rmchan AFK")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
afk.remove.assert_called_once()
|
||||
assert "Removed" in bot._replies[0]
|
||||
|
||||
def test_channel_not_found(self):
|
||||
bot = _FakeBot(channels={})
|
||||
msg = _FakeMessage(text="!mu rmchan Nowhere")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
assert "not found" in bot._replies[0].lower()
|
||||
|
||||
def test_missing_args(self):
|
||||
bot = _FakeBot()
|
||||
msg = _FakeMessage(text="!mu rmchan")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
assert "Usage" in bot._replies[0]
|
||||
|
||||
|
||||
# -- TestRename --------------------------------------------------------------
|
||||
|
||||
|
||||
class TestRename:
|
||||
def test_rename_channel(self):
|
||||
afk = _make_channel("AFK", channel_id=5)
|
||||
bot = _FakeBot(channels={5: afk})
|
||||
msg = _FakeMessage(text="!mu rename AFK Chill")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
afk.rename_channel.assert_called_once_with("Chill")
|
||||
assert "Renamed" in bot._replies[0]
|
||||
|
||||
def test_channel_not_found(self):
|
||||
bot = _FakeBot(channels={})
|
||||
msg = _FakeMessage(text="!mu rename Nowhere New")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
assert "not found" in bot._replies[0].lower()
|
||||
|
||||
def test_missing_args(self):
|
||||
bot = _FakeBot()
|
||||
msg = _FakeMessage(text="!mu rename AFK")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
assert "Usage" in bot._replies[0]
|
||||
|
||||
|
||||
# -- TestDesc ----------------------------------------------------------------
|
||||
|
||||
|
||||
class TestDesc:
|
||||
def test_set_description(self):
|
||||
afk = _make_channel("AFK", channel_id=5)
|
||||
bot = _FakeBot(channels={5: afk})
|
||||
msg = _FakeMessage(text="!mu desc AFK Away from keyboard")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
afk.set_channel_description.assert_called_once_with("Away from keyboard")
|
||||
assert "description" in bot._replies[0].lower()
|
||||
|
||||
def test_channel_not_found(self):
|
||||
bot = _FakeBot(channels={})
|
||||
msg = _FakeMessage(text="!mu desc Nowhere some text")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
assert "not found" in bot._replies[0].lower()
|
||||
|
||||
def test_missing_args(self):
|
||||
bot = _FakeBot()
|
||||
msg = _FakeMessage(text="!mu desc AFK")
|
||||
asyncio.run(cmd_mu(bot, msg))
|
||||
assert "Usage" in bot._replies[0]
|
||||
@@ -956,7 +956,7 @@ class TestDuckCommand:
|
||||
msg = _Msg(text="!duck")
|
||||
asyncio.run(_mod.cmd_duck(bot, msg))
|
||||
assert any("Duck:" in r for r in bot.replied)
|
||||
assert any("floor=1%" in r for r in bot.replied)
|
||||
assert any("floor=2%" in r for r in bot.replied)
|
||||
assert any("restore=30s" in r for r in bot.replied)
|
||||
|
||||
def test_toggle_on(self):
|
||||
@@ -2362,3 +2362,245 @@ class TestKeptRepair:
|
||||
stored = json.loads(raw)
|
||||
assert stored["filename"] == "song.webm"
|
||||
assert (music_dir / "song.webm").is_file()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestAutoplayDiscovery
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestAutoplayDiscovery:
|
||||
"""Tests for the discovery integration in _play_loop autoplay."""
|
||||
|
||||
def test_config_defaults(self):
|
||||
"""Default discover/discover_ratio values are set."""
|
||||
bot = _FakeBot()
|
||||
ps = _mod._ps(bot)
|
||||
assert ps["discover"] is True
|
||||
assert ps["discover_ratio"] == 3
|
||||
|
||||
def test_config_from_toml(self):
|
||||
"""Config values are read from bot config."""
|
||||
bot = _FakeBot()
|
||||
bot.config = {"music": {"discover": False, "discover_ratio": 5}}
|
||||
# Reset pstate so _ps re-reads config
|
||||
bot._pstate.clear()
|
||||
ps = _mod._ps(bot)
|
||||
assert ps["discover"] is False
|
||||
assert ps["discover_ratio"] == 5
|
||||
|
||||
def test_discovery_triggers_on_ratio(self, tmp_path):
|
||||
"""Discovery is attempted when autoplay_count is a multiple of ratio."""
|
||||
bot = _FakeBot()
|
||||
ps = _mod._ps(bot)
|
||||
ps["autoplay"] = True
|
||||
ps["discover"] = True
|
||||
ps["discover_ratio"] = 1 # trigger every pick
|
||||
ps["autoplay_cooldown"] = 0
|
||||
ps["duck_silence"] = 0
|
||||
|
||||
# Seed history so discovery has something to reference
|
||||
ps["history"] = [
|
||||
_mod._Track(url="x", title="Tool - Lateralus", requester="a"),
|
||||
]
|
||||
|
||||
# Set up kept tracks for fallback pool
|
||||
music_dir = tmp_path / "music"
|
||||
music_dir.mkdir()
|
||||
(music_dir / "a.opus").write_bytes(b"audio")
|
||||
bot.state.set("music", "keep:1", json.dumps({
|
||||
"url": "https://example.com/a", "title": "Kept Track",
|
||||
"filename": "a.opus", "id": 1,
|
||||
}))
|
||||
|
||||
discover_called = []
|
||||
|
||||
async def fake_discover(b, title):
|
||||
discover_called.append(title)
|
||||
return ("Deftones", "Change")
|
||||
|
||||
lastfm_mod = MagicMock()
|
||||
lastfm_mod.discover_similar = fake_discover
|
||||
bot.registry._modules = {"lastfm": lastfm_mod}
|
||||
|
||||
resolved = [("https://youtube.com/watch?v=x", "Deftones - Change")]
|
||||
|
||||
async def _run():
|
||||
with patch.object(_mod, "_MUSIC_DIR", music_dir), \
|
||||
patch.object(_mod, "_resolve_tracks", return_value=resolved), \
|
||||
patch.object(_mod, "_download_track", return_value=None):
|
||||
task = asyncio.create_task(
|
||||
_mod._play_loop(bot, seek=0.0, fade_in=False),
|
||||
)
|
||||
# Let it pick a track, then cancel
|
||||
await asyncio.sleep(0.5)
|
||||
task.cancel()
|
||||
try:
|
||||
await task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
asyncio.run(_run())
|
||||
assert len(discover_called) >= 1
|
||||
assert discover_called[0] == "Tool - Lateralus"
|
||||
|
||||
def test_discovery_disabled(self, tmp_path):
|
||||
"""Discovery is skipped when discover=False."""
|
||||
bot = _FakeBot()
|
||||
ps = _mod._ps(bot)
|
||||
ps["autoplay"] = True
|
||||
ps["discover"] = False
|
||||
ps["discover_ratio"] = 1
|
||||
ps["autoplay_cooldown"] = 0
|
||||
ps["duck_silence"] = 0
|
||||
ps["history"] = [
|
||||
_mod._Track(url="x", title="Tool - Lateralus", requester="a"),
|
||||
]
|
||||
|
||||
music_dir = tmp_path / "music"
|
||||
music_dir.mkdir()
|
||||
(music_dir / "a.opus").write_bytes(b"audio")
|
||||
bot.state.set("music", "keep:1", json.dumps({
|
||||
"url": "https://example.com/a", "title": "Kept Track",
|
||||
"filename": "a.opus", "id": 1,
|
||||
}))
|
||||
|
||||
discover_called = []
|
||||
|
||||
async def fake_discover(b, title):
|
||||
discover_called.append(title)
|
||||
return ("X", "Y")
|
||||
|
||||
lastfm_mod = MagicMock()
|
||||
lastfm_mod.discover_similar = fake_discover
|
||||
bot.registry._modules = {"lastfm": lastfm_mod}
|
||||
|
||||
async def _run():
|
||||
with patch.object(_mod, "_MUSIC_DIR", music_dir), \
|
||||
patch.object(_mod, "_download_track", return_value=None):
|
||||
task = asyncio.create_task(
|
||||
_mod._play_loop(bot, seek=0.0, fade_in=False),
|
||||
)
|
||||
await asyncio.sleep(0.5)
|
||||
task.cancel()
|
||||
try:
|
||||
await task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
asyncio.run(_run())
|
||||
assert discover_called == []
|
||||
|
||||
def test_discovery_dedup(self):
|
||||
"""Same discovered track is not resolved twice (dedup by seen set)."""
|
||||
# Unit-test the dedup logic directly: simulate the set-based
|
||||
# deduplication that _play_loop uses with _discover_seen.
|
||||
_discover_seen: set[str] = set()
|
||||
|
||||
def _would_resolve(artist: str, title: str) -> bool:
|
||||
key = f"{artist.lower()}:{title.lower()}"
|
||||
if key in _discover_seen:
|
||||
return False
|
||||
_discover_seen.add(key)
|
||||
return True
|
||||
|
||||
assert _would_resolve("Deftones", "Change") is True
|
||||
assert _would_resolve("Deftones", "Change") is False
|
||||
assert _would_resolve("deftones", "change") is False
|
||||
assert _would_resolve("Tool", "Sober") is True
|
||||
|
||||
def test_discovery_fallback_to_kept(self, tmp_path):
|
||||
"""Falls back to kept deck when discovery returns None."""
|
||||
bot = _FakeBot()
|
||||
ps = _mod._ps(bot)
|
||||
ps["autoplay"] = True
|
||||
ps["discover"] = True
|
||||
ps["discover_ratio"] = 1
|
||||
ps["autoplay_cooldown"] = 0
|
||||
ps["duck_silence"] = 0
|
||||
ps["history"] = [
|
||||
_mod._Track(url="x", title="Tool - Lateralus", requester="a"),
|
||||
]
|
||||
|
||||
music_dir = tmp_path / "music"
|
||||
music_dir.mkdir()
|
||||
(music_dir / "a.opus").write_bytes(b"audio")
|
||||
bot.state.set("music", "keep:1", json.dumps({
|
||||
"url": "https://example.com/a", "title": "Kept Track",
|
||||
"filename": "a.opus", "id": 1,
|
||||
}))
|
||||
|
||||
async def fake_discover(b, title):
|
||||
return None
|
||||
|
||||
lastfm_mod = MagicMock()
|
||||
lastfm_mod.discover_similar = fake_discover
|
||||
bot.registry._modules = {"lastfm": lastfm_mod}
|
||||
|
||||
queued_titles = []
|
||||
|
||||
async def _run():
|
||||
with patch.object(_mod, "_MUSIC_DIR", music_dir), \
|
||||
patch.object(_mod, "_download_track", return_value=None):
|
||||
task = asyncio.create_task(
|
||||
_mod._play_loop(bot, seek=0.0, fade_in=False),
|
||||
)
|
||||
await asyncio.sleep(0.5)
|
||||
# Check what was queued -- should be kept track, not discovered
|
||||
if ps.get("current"):
|
||||
queued_titles.append(ps["current"].title)
|
||||
task.cancel()
|
||||
try:
|
||||
await task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
asyncio.run(_run())
|
||||
# The kept track should have been used as fallback
|
||||
if queued_titles:
|
||||
assert queued_titles[0] == "Kept Track"
|
||||
|
||||
def test_no_history_skips_discovery(self, tmp_path):
|
||||
"""Discovery is skipped when history is empty."""
|
||||
bot = _FakeBot()
|
||||
ps = _mod._ps(bot)
|
||||
ps["autoplay"] = True
|
||||
ps["discover"] = True
|
||||
ps["discover_ratio"] = 1
|
||||
ps["autoplay_cooldown"] = 0
|
||||
ps["duck_silence"] = 0
|
||||
ps["history"] = []
|
||||
|
||||
music_dir = tmp_path / "music"
|
||||
music_dir.mkdir()
|
||||
(music_dir / "a.opus").write_bytes(b"audio")
|
||||
bot.state.set("music", "keep:1", json.dumps({
|
||||
"url": "https://example.com/a", "title": "Kept Track",
|
||||
"filename": "a.opus", "id": 1,
|
||||
}))
|
||||
|
||||
discover_called = []
|
||||
|
||||
async def fake_discover(b, title):
|
||||
discover_called.append(title)
|
||||
return ("X", "Y")
|
||||
|
||||
lastfm_mod = MagicMock()
|
||||
lastfm_mod.discover_similar = fake_discover
|
||||
bot.registry._modules = {"lastfm": lastfm_mod}
|
||||
|
||||
async def _run():
|
||||
with patch.object(_mod, "_MUSIC_DIR", music_dir), \
|
||||
patch.object(_mod, "_download_track", return_value=None):
|
||||
task = asyncio.create_task(
|
||||
_mod._play_loop(bot, seek=0.0, fade_in=False),
|
||||
)
|
||||
await asyncio.sleep(0.5)
|
||||
task.cancel()
|
||||
try:
|
||||
await task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
asyncio.run(_run())
|
||||
assert discover_called == []
|
||||
|
||||
310
tests/test_musicbrainz.py
Normal file
310
tests/test_musicbrainz.py
Normal file
@@ -0,0 +1,310 @@
|
||||
"""Tests for the MusicBrainz API helper module."""
|
||||
|
||||
import importlib.util
|
||||
import json
|
||||
import sys
|
||||
import time
|
||||
from io import BytesIO
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
# -- Load module directly ----------------------------------------------------
|
||||
|
||||
_spec = importlib.util.spec_from_file_location(
|
||||
"_musicbrainz", "plugins/_musicbrainz.py",
|
||||
)
|
||||
_mod = importlib.util.module_from_spec(_spec)
|
||||
sys.modules["_musicbrainz"] = _mod
|
||||
_spec.loader.exec_module(_mod)
|
||||
|
||||
|
||||
# -- Helpers -----------------------------------------------------------------
|
||||
|
||||
|
||||
def _make_resp(data: dict) -> MagicMock:
|
||||
"""Create a fake HTTP response with JSON body."""
|
||||
resp = MagicMock()
|
||||
resp.read.return_value = json.dumps(data).encode()
|
||||
return resp
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestMbRequest
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestMbRequest:
|
||||
def setup_method(self):
|
||||
_mod._last_request = 0.0
|
||||
|
||||
def test_returns_parsed_json(self):
|
||||
resp = _make_resp({"status": "ok"})
|
||||
with patch("derp.http.urlopen", return_value=resp):
|
||||
result = _mod._mb_request("artist", {"query": "Tool"})
|
||||
assert result == {"status": "ok"}
|
||||
|
||||
def test_rate_delay_enforced(self):
|
||||
"""Second call within rate interval triggers sleep."""
|
||||
_mod._last_request = time.monotonic()
|
||||
resp = _make_resp({})
|
||||
slept = []
|
||||
with patch("derp.http.urlopen", return_value=resp), \
|
||||
patch.object(_mod.time, "sleep", side_effect=slept.append), \
|
||||
patch.object(_mod.time, "monotonic", return_value=_mod._last_request + 0.2):
|
||||
_mod._mb_request("artist", {"query": "X"})
|
||||
assert len(slept) == 1
|
||||
assert slept[0] > 0
|
||||
|
||||
def test_no_delay_when_interval_elapsed(self):
|
||||
"""No sleep when enough time has passed since last request."""
|
||||
_mod._last_request = time.monotonic() - 5.0
|
||||
resp = _make_resp({})
|
||||
with patch("derp.http.urlopen", return_value=resp), \
|
||||
patch.object(_mod.time, "sleep") as mock_sleep:
|
||||
_mod._mb_request("artist", {"query": "X"})
|
||||
mock_sleep.assert_not_called()
|
||||
|
||||
def test_returns_empty_on_error(self):
|
||||
with patch("derp.http.urlopen", side_effect=ConnectionError("fail")):
|
||||
result = _mod._mb_request("artist", {"query": "X"})
|
||||
assert result == {}
|
||||
|
||||
def test_updates_last_request_on_success(self):
|
||||
_mod._last_request = 0.0
|
||||
resp = _make_resp({})
|
||||
with patch("derp.http.urlopen", return_value=resp):
|
||||
_mod._mb_request("test")
|
||||
assert _mod._last_request > 0
|
||||
|
||||
def test_updates_last_request_on_error(self):
|
||||
_mod._last_request = 0.0
|
||||
with patch("derp.http.urlopen", side_effect=Exception("boom")):
|
||||
_mod._mb_request("test")
|
||||
assert _mod._last_request > 0
|
||||
|
||||
def test_none_params(self):
|
||||
"""Handles None params without error."""
|
||||
resp = _make_resp({"ok": True})
|
||||
with patch("derp.http.urlopen", return_value=resp):
|
||||
result = _mod._mb_request("test", None)
|
||||
assert result == {"ok": True}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestMbSearchArtist
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestMbSearchArtist:
|
||||
def setup_method(self):
|
||||
_mod._last_request = 0.0
|
||||
|
||||
def test_returns_mbid(self):
|
||||
data = {"artists": [{"id": "abc-123", "name": "Tool", "score": 100}]}
|
||||
with patch.object(_mod, "_mb_request", return_value=data):
|
||||
result = _mod.mb_search_artist("Tool")
|
||||
assert result == "abc-123"
|
||||
|
||||
def test_returns_none_no_results(self):
|
||||
with patch.object(_mod, "_mb_request", return_value={"artists": []}):
|
||||
assert _mod.mb_search_artist("Unknown") is None
|
||||
|
||||
def test_returns_none_on_empty_response(self):
|
||||
with patch.object(_mod, "_mb_request", return_value={}):
|
||||
assert _mod.mb_search_artist("X") is None
|
||||
|
||||
def test_returns_none_low_score(self):
|
||||
"""Rejects matches with score below 50."""
|
||||
data = {"artists": [{"id": "low", "name": "Mismatch", "score": 30}]}
|
||||
with patch.object(_mod, "_mb_request", return_value=data):
|
||||
assert _mod.mb_search_artist("Tool") is None
|
||||
|
||||
def test_returns_none_on_error(self):
|
||||
with patch.object(_mod, "_mb_request", return_value={}):
|
||||
assert _mod.mb_search_artist("Error") is None
|
||||
|
||||
def test_accepts_high_score(self):
|
||||
data = {"artists": [{"id": "abc", "name": "Tool", "score": 85}]}
|
||||
with patch.object(_mod, "_mb_request", return_value=data):
|
||||
assert _mod.mb_search_artist("Tool") == "abc"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestMbArtistTags
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestMbArtistTags:
|
||||
def setup_method(self):
|
||||
_mod._last_request = 0.0
|
||||
|
||||
def test_returns_sorted_top_5(self):
|
||||
data = {"tags": [
|
||||
{"name": "rock", "count": 50},
|
||||
{"name": "metal", "count": 100},
|
||||
{"name": "prog", "count": 80},
|
||||
{"name": "alternative", "count": 60},
|
||||
{"name": "hard rock", "count": 40},
|
||||
{"name": "grunge", "count": 30},
|
||||
]}
|
||||
with patch.object(_mod, "_mb_request", return_value=data):
|
||||
result = _mod.mb_artist_tags("mbid-123")
|
||||
assert len(result) == 5
|
||||
assert result[0] == "metal"
|
||||
assert result[1] == "prog"
|
||||
assert result[2] == "alternative"
|
||||
assert result[3] == "rock"
|
||||
assert result[4] == "hard rock"
|
||||
|
||||
def test_empty_tags(self):
|
||||
with patch.object(_mod, "_mb_request", return_value={"tags": []}):
|
||||
assert _mod.mb_artist_tags("mbid") == []
|
||||
|
||||
def test_no_tags_key(self):
|
||||
with patch.object(_mod, "_mb_request", return_value={}):
|
||||
assert _mod.mb_artist_tags("mbid") == []
|
||||
|
||||
def test_skips_nameless_tags(self):
|
||||
data = {"tags": [
|
||||
{"name": "rock", "count": 50},
|
||||
{"count": 100}, # no name
|
||||
{"name": "", "count": 80}, # empty name
|
||||
]}
|
||||
with patch.object(_mod, "_mb_request", return_value=data):
|
||||
result = _mod.mb_artist_tags("mbid")
|
||||
assert result == ["rock"]
|
||||
|
||||
def test_fewer_than_5_tags(self):
|
||||
data = {"tags": [{"name": "jazz", "count": 10}]}
|
||||
with patch.object(_mod, "_mb_request", return_value=data):
|
||||
result = _mod.mb_artist_tags("mbid")
|
||||
assert result == ["jazz"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestMbFindSimilarRecordings
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestMbFindSimilarRecordings:
|
||||
def setup_method(self):
|
||||
_mod._last_request = 0.0
|
||||
|
||||
def test_returns_dicts(self):
|
||||
data = {"recordings": [
|
||||
{
|
||||
"title": "Song A",
|
||||
"artist-credit": [{"name": "Other Artist"}],
|
||||
},
|
||||
{
|
||||
"title": "Song B",
|
||||
"artist-credit": [{"name": "Another Band"}],
|
||||
},
|
||||
]}
|
||||
with patch.object(_mod, "_mb_request", return_value=data):
|
||||
result = _mod.mb_find_similar_recordings(
|
||||
"Tool", ["rock", "metal"],
|
||||
)
|
||||
assert len(result) == 2
|
||||
assert result[0] == {"artist": "Other Artist", "title": "Song A"}
|
||||
assert result[1] == {"artist": "Another Band", "title": "Song B"}
|
||||
|
||||
def test_excludes_original_artist(self):
|
||||
data = {"recordings": [
|
||||
{
|
||||
"title": "Own Song",
|
||||
"artist-credit": [{"name": "Tool"}],
|
||||
},
|
||||
{
|
||||
"title": "Other Song",
|
||||
"artist-credit": [{"name": "Deftones"}],
|
||||
},
|
||||
]}
|
||||
with patch.object(_mod, "_mb_request", return_value=data):
|
||||
result = _mod.mb_find_similar_recordings(
|
||||
"Tool", ["rock"],
|
||||
)
|
||||
assert len(result) == 1
|
||||
assert result[0]["artist"] == "Deftones"
|
||||
|
||||
def test_excludes_original_artist_case_insensitive(self):
|
||||
data = {"recordings": [
|
||||
{
|
||||
"title": "Song",
|
||||
"artist-credit": [{"name": "TOOL"}],
|
||||
},
|
||||
]}
|
||||
with patch.object(_mod, "_mb_request", return_value=data):
|
||||
result = _mod.mb_find_similar_recordings(
|
||||
"Tool", ["rock"],
|
||||
)
|
||||
assert result == []
|
||||
|
||||
def test_deduplicates(self):
|
||||
data = {"recordings": [
|
||||
{
|
||||
"title": "Song A",
|
||||
"artist-credit": [{"name": "Band X"}],
|
||||
},
|
||||
{
|
||||
"title": "Song A",
|
||||
"artist-credit": [{"name": "Band X"}],
|
||||
},
|
||||
]}
|
||||
with patch.object(_mod, "_mb_request", return_value=data):
|
||||
result = _mod.mb_find_similar_recordings(
|
||||
"Other", ["rock"],
|
||||
)
|
||||
assert len(result) == 1
|
||||
|
||||
def test_empty_tags(self):
|
||||
result = _mod.mb_find_similar_recordings("Tool", [])
|
||||
assert result == []
|
||||
|
||||
def test_no_recordings(self):
|
||||
with patch.object(_mod, "_mb_request", return_value={"recordings": []}):
|
||||
result = _mod.mb_find_similar_recordings(
|
||||
"Tool", ["rock"],
|
||||
)
|
||||
assert result == []
|
||||
|
||||
def test_empty_response(self):
|
||||
with patch.object(_mod, "_mb_request", return_value={}):
|
||||
result = _mod.mb_find_similar_recordings(
|
||||
"Tool", ["rock"],
|
||||
)
|
||||
assert result == []
|
||||
|
||||
def test_skips_missing_title(self):
|
||||
data = {"recordings": [
|
||||
{
|
||||
"title": "",
|
||||
"artist-credit": [{"name": "Band"}],
|
||||
},
|
||||
]}
|
||||
with patch.object(_mod, "_mb_request", return_value=data):
|
||||
result = _mod.mb_find_similar_recordings(
|
||||
"Other", ["rock"],
|
||||
)
|
||||
assert result == []
|
||||
|
||||
def test_skips_missing_artist_credit(self):
|
||||
data = {"recordings": [
|
||||
{"title": "Song", "artist-credit": []},
|
||||
{"title": "Song2"},
|
||||
]}
|
||||
with patch.object(_mod, "_mb_request", return_value=data):
|
||||
result = _mod.mb_find_similar_recordings(
|
||||
"Other", ["rock"],
|
||||
)
|
||||
assert result == []
|
||||
|
||||
def test_uses_top_two_tags(self):
|
||||
"""Query should use at most 2 tags."""
|
||||
with patch.object(_mod, "_mb_request", return_value={}) as mock_req:
|
||||
_mod.mb_find_similar_recordings(
|
||||
"Tool", ["rock", "metal", "prog"],
|
||||
)
|
||||
call_args = mock_req.call_args
|
||||
query = call_args[1]["query"] if "query" in (call_args[1] or {}) else call_args[0][1].get("query", "")
|
||||
# Verify the query contains both tag references
|
||||
assert "rock" in query or "metal" in query
|
||||
Reference in New Issue
Block a user