Compare commits

...

16 Commits

Author SHA1 Message Date
user
135a3791e2 feat: add MusicBrainz fallback to !similar and !tags commands
Some checks failed
CI / gitleaks (push) Failing after 2s
CI / lint (push) Failing after 23s
CI / test (3.11) (push) Has been skipped
CI / test (3.12) (push) Has been skipped
CI / test (3.13) (push) Has been skipped
CI / build (push) Has been skipped
Remove early return on missing Last.fm API key. Both commands now
fall back to MusicBrainz (mb_search_artist -> mb_artist_tags ->
mb_find_similar_recordings) when no API key is configured or when
Last.fm returns empty results. Same pattern used by discover_similar.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 21:56:39 +01:00
user
a87f75adf1 feat: add Mumble server admin plugin (!mu)
Admin-only command with subcommand dispatch for server management:
kick, ban, mute/unmute, deafen/undeafen, move, users, channels,
mkchan, rmchan, rename, desc. Auto-loads on merlin via except_plugins.
2026-02-23 21:44:38 +01:00
user
da9ed51c74 feat: auto-discover similar tracks during autoplay via Last.fm/MusicBrainz
Every Nth autoplay pick (configurable via discover_ratio), query Last.fm
for similar tracks. When Last.fm has no key or returns nothing, fall back
to MusicBrainz tag-based recording search (no API key needed). Discovered
tracks are resolved via yt-dlp and deduplicated within the session. If
discovery fails, the kept-deck shuffle continues as before.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 21:19:41 +01:00
user
56f6b9822f fix: revert pymumble protocol version patch that broke audio
Reporting version 1.5.0 to mumble-server caused it to expect the newer
audio frame format (terminator bit on opus length varint). pymumble
1.6.1 does not implement this, so the server silently dropped every
UDPTunnel audio packet. Revert to native 1.2.4 -- audio works, only
side effect is a cosmetic ChannelListener warning.

Also remove --cprofile from docker-compose command.
2026-02-23 18:51:53 +01:00
user
09880624d5 fix: add bot name to stream_audio log lines
Multi-bot setups need to know which bot is streaming. Prefixes all
stream_audio log messages with [username] for clarity.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 23:15:46 +01:00
user
3c475107e3 refactor: simplify audition to single-bot playback
derp now handles both listening and speaking, so audition no longer
needs cross-bot lookup or dual-play through merlin.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 23:07:08 +01:00
user
b3006b02e2 feat: auto-register mumble bots on first connect
Self-register with the Mumble server on initial connection so both
derp and merlin get persistent identities (cert-bound user_id).
Skips registration on reconnect or if already registered.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 22:32:36 +01:00
user
8b504364a9 fix: patch pymumble protocol version and harden mumble connections
- Update reported protocol version from 1.2.4 to 1.5.0 so modern
  Murmur servers treat PyMumble as a compatible client
- Fix OS string to report actual platform instead of "PyMumble 1.6.1"
  (was shown as [Invalid] by Murmur)
- Raise pymumble reconnect retry interval to 15s to prevent autoban
  when running multiple bots from the same IP
- Enable TCP keepalive on control socket (10s idle) to prevent NAT
  gateways from dropping long-lived connections

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 22:28:57 +01:00
user
40c6bf8c53 feat: playlist import, show, and shuffle-on-load
Add !playlist import <name> <url> to resolve and save tracks from a URL
without queueing them. Add !playlist show <name> to display tracks in a
saved playlist via long_reply (auto-pastes on overflow). Add optional
'shuffle' keyword to !playlist load for randomized playback order.
2026-02-22 20:31:54 +01:00
user
a76d46b1de fix: graceful SIGTERM shutdown for IRC and Mumble bots
Send IRC QUIT before closing the socket so the server releases the nick
immediately. Wrap readline() in wait_for(timeout=2.0) so the _running
check triggers even when the server is quiet. Explicitly stop pymumble
in the signal handler to accelerate Mumble teardown. Together these
eliminate the 10s podman grace-period SIGKILL.
2026-02-22 20:31:49 +01:00
user
0ffddb8e41 fix: write cProfile to data/ volume for host access
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 20:02:14 +01:00
user
62b01c76f7 fix: reduce cProfile dump interval to 10s
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 20:00:06 +01:00
user
e0db0ad567 fix: periodic cProfile dump every 60s (survives hard kills)
Replace cProfile.runctx with Profile() + daemon thread that dumps
stats to disk every 60 seconds. Final dump on clean exit. Profile
data no longer lost on container stop, OOM, or SIGKILL.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 19:56:21 +01:00
user
c41035ceca test: add test_lastfm.py (50 cases)
Covers API helpers (_get_similar_artists, _get_top_tags,
_get_similar_tracks, _search_track), config resolution, metadata
extraction from track titles, match score formatting, and both
commands (!similar, !tags) including play mode delegation, fallback
from track to artist similarity, and current-track integration.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 19:52:16 +01:00
user
cd4124e07a fix: route alert YouTube/SearXNG through pooled urlopen
- YouTube InnerTube search: urllib.request.urlopen -> _urlopen (gets
  connection pooling + SOCKS5 proxy)
- SearXNG search: urllib.request.urlopen -> _urlopen(proxy=False)
  (local service, skip proxy, get pooling)
- Update 5 tests to patch _urlopen instead of urllib.request.urlopen

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 19:39:38 +01:00
user
717bf59a05 feat: playlist save/load, queue durations, whisper bias, greet fix
- Move TTS greeting from mumble._play_greet to voice.on_connected
  (fires once on first connect, gated on _is_audio_ready)
- Add initial_prompt multipart field to Whisper STT for trigger word
  bias (auto-generated from trigger config, overridable)
- Enhanced !queue: elapsed/total on now-playing, per-track durations,
  footer with track count and total time
- New !playlist command: save/load/list/del named playlists via
  bot.state persistence (playlist:<name> keys)
- Fix duck floor test (1% -> 2% to match default change)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 19:23:03 +01:00
19 changed files with 2745 additions and 154 deletions

View File

@@ -9,8 +9,10 @@
| P0 | [x] | Instant packet-based ducking via pymumble sound callback (~20ms) |
| P0 | [x] | Duck floor raised to 2% (keep music audible during voice) |
| P0 | [x] | Strip leading punctuation from voice trigger remainder |
| P1 | [ ] | Queue display improvements (`!queue` shows position, duration, total time) |
| P1 | [ ] | Playlist save/load (`!playlist save <name>`, `!playlist load <name>`) |
| P0 | [x] | Fix greeting tests: move greet TTS to voice plugin `on_connected` |
| P0 | [x] | Whisper `initial_prompt` bias for trigger word recognition |
| P1 | [x] | Queue display improvements (`!queue` shows elapsed/duration, totals) |
| P1 | [x] | Playlist save/load/list/del (`!playlist save <name>`, etc.) |
| P2 | [ ] | Per-channel voice settings (different voice per Mumble channel) |
## Previous Sprint -- Performance: HTTP + Parsing (2026-02-22)
@@ -21,7 +23,7 @@
| P0 | [x] | `plugins/searx.py` -- route through `derp.http.urlopen(proxy=False)` |
| P1 | [x] | Connection pool: `preload_content=True` + `_PooledResponse` wrapper for connection reuse |
| P1 | [x] | Pool tuning: `num_pools=30, maxsize=8` (was 20/4) |
| P2 | [ ] | Audit remaining plugins for unnecessary proxy routing |
| P2 | [x] | Audit remaining plugins for unnecessary proxy routing |
## Previous Sprint -- Music Discovery via Last.fm (2026-02-22)
@@ -32,7 +34,7 @@
| P0 | [x] | `!similar play` -- queue a similar track via YouTube search |
| P1 | [x] | `!tags` command -- show genre/style tags for current or named track |
| P1 | [x] | Config: `[lastfm] api_key` or `LASTFM_API_KEY` env var |
| P2 | [ ] | Tests: `test_lastfm.py` (API response mocking, command dispatch) |
| P2 | [x] | Tests: `test_lastfm.py` (50 cases: API helpers, metadata, commands) |
| P2 | [ ] | Documentation update (USAGE.md, CHEATSHEET.md) |
## Previous Sprint -- v2.3.0 Mumble Voice + Multi-Bot (2026-02-22)

View File

@@ -18,4 +18,4 @@ services:
- ./secrets:/app/secrets:ro,Z
environment:
- OPENROUTER_API_KEY
command: ["--verbose", "--cprofile"]
command: ["--verbose"]

View File

@@ -574,7 +574,7 @@ HTML stripped on receive, escaped on send. IRC-only commands are no-ops.
!prev # Go back to previous track (fades out)
!seek 1:30 # Seek to position (also +30, -30)
!resume # Resume last stopped/skipped track
!queue # Show queue
!queue # Show queue (with durations + totals)
!queue <url> # Add to queue (alias for !play)
!np # Now playing
!volume # Show current volume
@@ -587,9 +587,13 @@ HTML stripped on receive, escaped on send. IRC-only commands are no-ops.
!duck # Show ducking status
!duck on # Enable voice ducking
!duck off # Disable voice ducking
!duck floor 5 # Set duck floor volume (0-100, default 1)
!duck floor 5 # Set duck floor volume (0-100, default 2)
!duck silence 20 # Set silence timeout seconds (default 15)
!duck restore 45 # Set restore ramp duration seconds (default 30)
!playlist save <name> # Save current + queued tracks as named playlist
!playlist load <name> # Append saved playlist to queue, start if idle
!playlist list # Show saved playlists with track counts
!playlist del <name> # Delete a saved playlist
```
Requires: `yt-dlp`, `ffmpeg`, `libopus` on the host.

View File

@@ -1623,13 +1623,17 @@ and voice transmission.
!prev Go back to the previous track (fade-out)
!seek <offset> Seek to position (1:30, 90, +30, -30)
!resume Resume last stopped/skipped track from saved position
!queue Show queue
!queue Show queue (with durations + totals)
!queue <url> Add to queue (alias for !play)
!np Now playing
!volume [0-100] Get/set volume (persisted across restarts)
!keep Keep current track's audio file (with metadata)
!kept [rm <id>|clear|repair] List, remove, clear, or repair kept files
!testtone Play 3-second 440Hz test tone
!playlist save <name> Save current + queued tracks as named playlist
!playlist load <name> Append saved playlist to queue, start if idle
!playlist list Show saved playlists with track counts
!playlist del <name> Delete a saved playlist
```
- Queue holds up to 50 tracks
@@ -1712,7 +1716,7 @@ volume gradually restores to the user-set level in small steps.
!duck Show ducking status and settings
!duck on Enable voice ducking
!duck off Disable voice ducking
!duck floor <0-100> Set floor volume % (default: 1)
!duck floor <0-100> Set floor volume % (default: 2)
!duck silence <sec> Set silence timeout in seconds (default: 15)
!duck restore <sec> Set restore ramp duration in seconds (default: 30)
```

View File

@@ -2,6 +2,7 @@
1. pymumble: ssl.wrap_socket was removed in 3.13
2. opuslib: ctypes.util.find_library fails on musl-based distros
3. pymumble: close stale socket on reconnect
"""
import pathlib
@@ -65,5 +66,8 @@ new_init = """\
self.control_socket = None"""
assert old_init in src, "pymumble init_connection socket patch target not found"
p.write_text(src.replace(old_init, new_init))
src = src.replace(old_init, new_init)
print("pymumble reconnect socket patch applied")
p.write_text(src)

118
plugins/_musicbrainz.py Normal file
View File

@@ -0,0 +1,118 @@
"""MusicBrainz API helper for music discovery fallback.
Private module (underscore prefix) -- plugin loader skips it.
All functions are blocking; callers should run them in an executor.
"""
from __future__ import annotations
import json
import logging
import time
from urllib.request import Request
log = logging.getLogger(__name__)
_BASE = "https://musicbrainz.org/ws/2"
_UA = "derp-bot/2.0.0 (https://git.mymx.me/username/derp)"
# Rate limit: MusicBrainz requires max 1 request/second.
# We use 1.1s between calls to stay well within limits.
_RATE_INTERVAL = 1.1
_last_request: float = 0.0
def _mb_request(path: str, params: dict | None = None) -> dict:
"""Rate-limited GET to MusicBrainz API. Blocking."""
global _last_request
from derp.http import urlopen
elapsed = time.monotonic() - _last_request
if elapsed < _RATE_INTERVAL:
time.sleep(_RATE_INTERVAL - elapsed)
qs = "&".join(f"{k}={v}" for k, v in (params or {}).items())
url = f"{_BASE}/{path}?fmt=json&{qs}" if qs else f"{_BASE}/{path}?fmt=json"
req = Request(url, headers={"User-Agent": _UA})
try:
resp = urlopen(req, timeout=10, proxy=False)
_last_request = time.monotonic()
return json.loads(resp.read().decode())
except Exception:
_last_request = time.monotonic()
log.warning("musicbrainz: request failed: %s", path, exc_info=True)
return {}
def mb_search_artist(name: str) -> str | None:
"""Search for an artist by name, return MBID or None."""
from urllib.parse import quote
data = _mb_request("artist", {"query": quote(name), "limit": "1"})
artists = data.get("artists", [])
if not artists:
return None
# Require a reasonable score to avoid false matches
score = artists[0].get("score", 0)
if score < 50:
return None
return artists[0].get("id")
def mb_artist_tags(mbid: str) -> list[str]:
"""Fetch top 5 tags for an artist by MBID."""
data = _mb_request(f"artist/{mbid}", {"inc": "tags"})
tags = data.get("tags", [])
if not tags:
return []
# Sort by count descending, take top 5
sorted_tags = sorted(tags, key=lambda t: t.get("count", 0), reverse=True)
return [t["name"] for t in sorted_tags[:5] if t.get("name")]
def mb_find_similar_recordings(artist: str, tags: list[str],
limit: int = 10) -> list[dict]:
"""Find recordings by other artists sharing top tags.
Searches MusicBrainz for recordings tagged with the top 2 tags,
excluding the original artist. Returns [{"artist": str, "title": str}].
"""
from urllib.parse import quote
if not tags:
return []
# Use top 2 tags for the query
tag_query = " AND ".join(f'tag:"{t}"' for t in tags[:2])
query = f'({tag_query}) AND NOT artist:"{artist}"'
data = _mb_request("recording", {
"query": quote(query),
"limit": str(limit),
})
recordings = data.get("recordings", [])
if not recordings:
return []
seen = set()
results = []
for rec in recordings:
title = rec.get("title", "")
credits = rec.get("artist-credit", [])
if not credits or not title:
continue
rec_artist = credits[0].get("name", "") if credits else ""
if not rec_artist:
continue
# Skip the original artist (case-insensitive)
if rec_artist.lower() == artist.lower():
continue
# Deduplicate by artist+title
key = f"{rec_artist.lower()}:{title.lower()}"
if key in seen:
continue
seen.add(key)
results.append({"artist": rec_artist, "title": title})
return results

View File

@@ -436,7 +436,7 @@ def _search_youtube(keyword: str) -> list[dict]:
req = urllib.request.Request(_YT_SEARCH_URL, data=payload, method="POST")
req.add_header("Content-Type", "application/json")
resp = urllib.request.urlopen(req, timeout=_FETCH_TIMEOUT)
resp = _urlopen(req, timeout=_FETCH_TIMEOUT)
raw = resp.read()
resp.close()
@@ -545,7 +545,7 @@ def _search_searx(keyword: str) -> list[dict]:
})
req = urllib.request.Request(f"{_SEARX_URL}?{params}", method="GET")
try:
resp = urllib.request.urlopen(req, timeout=_FETCH_TIMEOUT)
resp = _urlopen(req, timeout=_FETCH_TIMEOUT, proxy=False)
raw = resp.read()
resp.close()
except Exception as exc:

View File

@@ -2,6 +2,7 @@
from __future__ import annotations
import asyncio
import json
import logging
import os
@@ -91,6 +92,19 @@ def _search_track(api_key: str, query: str,
# -- Metadata extraction -----------------------------------------------------
def _parse_title(raw_title: str) -> tuple[str, str]:
"""Split a raw track title into (artist, title).
Tries common separators: `` - ``, `` -- ``, `` | ``, `` ~ ``.
Returns ``("", raw_title)`` if no separator is found.
"""
for sep in (" - ", " -- ", " | ", " ~ "):
if sep in raw_title:
parts = raw_title.split(sep, 1)
return (parts[0].strip(), parts[1].strip())
return ("", raw_title)
def _current_meta(bot) -> tuple[str, str]:
"""Extract artist and title from the currently playing track.
@@ -103,15 +117,60 @@ def _current_meta(bot) -> tuple[str, str]:
if current is None:
return ("", "")
raw_title = current.title or ""
return _parse_title(raw_title)
# Try common "Artist - Title" patterns
for sep in (" - ", " -- ", " | ", " ~ "):
if sep in raw_title:
parts = raw_title.split(sep, 1)
return (parts[0].strip(), parts[1].strip())
# No separator -- treat whole thing as a search query
return ("", raw_title)
# -- Discovery orchestrator --------------------------------------------------
async def discover_similar(bot, last_track_title: str) -> tuple[str, str] | None:
"""Find a similar track via Last.fm or MusicBrainz fallback.
Returns ``(artist, title)`` or ``None`` if nothing found.
"""
artist, title = _parse_title(last_track_title)
loop = asyncio.get_running_loop()
# -- Last.fm path --
api_key = _get_api_key(bot)
if api_key and artist:
try:
similar = await loop.run_in_executor(
None, _get_similar_tracks, api_key, artist, title, 20,
)
if similar:
pick = random.choice(similar)
pick_artist = pick.get("artist", {}).get("name", "")
pick_title = pick.get("name", "")
if pick_artist and pick_title:
return (pick_artist, pick_title)
except Exception:
log.warning("lastfm: discover via Last.fm failed", exc_info=True)
# -- MusicBrainz fallback --
if artist:
try:
from plugins._musicbrainz import (
mb_artist_tags,
mb_find_similar_recordings,
mb_search_artist,
)
mbid = await loop.run_in_executor(None, mb_search_artist, artist)
if mbid:
tags = await loop.run_in_executor(None, mb_artist_tags, mbid)
if tags:
picks = await loop.run_in_executor(
None, mb_find_similar_recordings, artist, tags, 20,
)
if picks:
pick = random.choice(picks)
return (pick["artist"], pick["title"])
except Exception:
log.warning("lastfm: discover via MusicBrainz failed",
exc_info=True)
return None
# -- Formatting --------------------------------------------------------------
@@ -139,9 +198,6 @@ async def cmd_similar(bot, message):
!similar play <artist> Queue a similar track for named artist
"""
api_key = _get_api_key(bot)
if not api_key:
await bot.reply(message, "Last.fm API key not configured")
return
parts = message.text.split(None, 2)
# !similar play [artist]
@@ -164,38 +220,86 @@ async def cmd_similar(bot, message):
await bot.reply(message, "Nothing playing and no artist given")
return
# Try track-level similarity first if we have both artist + title
# -- Last.fm path --
similar = []
if artist and title:
similar = await loop.run_in_executor(
None, _get_similar_tracks, api_key, artist, title,
)
similar_artists = []
if api_key:
# Try track-level similarity first if we have both artist + title
if artist and title:
similar = await loop.run_in_executor(
None, _get_similar_tracks, api_key, artist, title,
)
# Fall back to artist-level similarity
if not similar:
search_artist = artist or title
similar_artists = await loop.run_in_executor(
None, _get_similar_artists, api_key, search_artist,
)
# Fall back to artist-level similarity
if not similar:
# -- MusicBrainz fallback --
mb_results = []
if not similar and not similar_artists:
search_artist = artist or title
similar_artists = await loop.run_in_executor(
None, _get_similar_artists, api_key, search_artist,
)
if not similar_artists:
await bot.reply(message, f"No similar artists found for '{search_artist}'")
try:
from plugins._musicbrainz import (
mb_artist_tags,
mb_find_similar_recordings,
mb_search_artist,
)
mbid = await loop.run_in_executor(
None, mb_search_artist, search_artist,
)
if mbid:
tags = await loop.run_in_executor(None, mb_artist_tags, mbid)
if tags:
mb_results = await loop.run_in_executor(
None, mb_find_similar_recordings, search_artist,
tags, 20,
)
except Exception:
log.warning("lastfm: MusicBrainz fallback failed", exc_info=True)
# -- Track-level results (Last.fm) --
if similar:
if play_mode:
pick = random.choice(similar[:10])
pick_artist = pick.get("artist", {}).get("name", "")
pick_title = pick.get("name", "")
search = f"{pick_artist} {pick_title}".strip()
if not search:
await bot.reply(message, "No playable result found")
return
message.text = f"!play {search}"
music_mod = bot.registry._modules.get("music")
if music_mod:
await music_mod.cmd_play(bot, message)
return
lines = [f"Similar to {artist} - {title}:"]
for t in similar[:8]:
t_artist = t.get("artist", {}).get("name", "")
t_name = t.get("name", "?")
match = _fmt_match(t.get("match", ""))
suffix = f" ({match})" if match else ""
lines.append(f" {t_artist} - {t_name}{suffix}")
await bot.long_reply(message, lines, label="similar tracks")
return
# -- Artist-level results (Last.fm) --
if similar_artists:
search_artist = artist or title
if play_mode:
# Pick a random similar artist and search YouTube
pick = random.choice(similar_artists[:10])
pick_name = pick.get("name", "")
if not pick_name:
await bot.reply(message, "No playable result found")
return
# Inject a !play command with a YouTube search
message.text = f"!play {pick_name}"
music_mod = bot.registry._modules.get("music")
if music_mod:
await music_mod.cmd_play(bot, message)
return
# Display similar artists
lines = [f"Similar to {search_artist}:"]
for a in similar_artists[:8]:
name = a.get("name", "?")
@@ -205,30 +309,27 @@ async def cmd_similar(bot, message):
await bot.long_reply(message, lines, label="similar artists")
return
# Track-level results
if play_mode:
pick = random.choice(similar[:10])
pick_artist = pick.get("artist", {}).get("name", "")
pick_title = pick.get("name", "")
search = f"{pick_artist} {pick_title}".strip()
if not search:
await bot.reply(message, "No playable result found")
# -- MusicBrainz results --
if mb_results:
search_artist = artist or title
if play_mode:
pick = random.choice(mb_results[:10])
search = f"{pick['artist']} {pick['title']}".strip()
message.text = f"!play {search}"
music_mod = bot.registry._modules.get("music")
if music_mod:
await music_mod.cmd_play(bot, message)
return
message.text = f"!play {search}"
music_mod = bot.registry._modules.get("music")
if music_mod:
await music_mod.cmd_play(bot, message)
lines = [f"Similar to {search_artist}:"]
for r in mb_results[:8]:
lines.append(f" {r['artist']} - {r['title']}")
await bot.long_reply(message, lines, label="similar tracks")
return
# Display similar tracks
lines = [f"Similar to {artist} - {title}:"]
for t in similar[:8]:
t_artist = t.get("artist", {}).get("name", "")
t_name = t.get("name", "?")
match = _fmt_match(t.get("match", ""))
suffix = f" ({match})" if match else ""
lines.append(f" {t_artist} - {t_name}{suffix}")
await bot.long_reply(message, lines, label="similar tracks")
# Nothing found
search_artist = artist or title
await bot.reply(message, f"No similar artists found for '{search_artist}'")
@command("tags", help="Music: !tags [artist] -- show genre tags")
@@ -240,9 +341,6 @@ async def cmd_tags(bot, message):
!tags <artist> Tags for named artist
"""
api_key = _get_api_key(bot)
if not api_key:
await bot.reply(message, "Last.fm API key not configured")
return
parts = message.text.split(None, 1)
query = parts[1].strip() if len(parts) > 1 else ""
@@ -259,9 +357,28 @@ async def cmd_tags(bot, message):
await bot.reply(message, "Nothing playing and no artist given")
return
tags = await loop.run_in_executor(
None, _get_top_tags, api_key, artist,
)
# -- Last.fm path --
tags = []
if api_key:
tags = await loop.run_in_executor(
None, _get_top_tags, api_key, artist,
)
# -- MusicBrainz fallback --
if not tags:
try:
from plugins._musicbrainz import mb_artist_tags, mb_search_artist
mbid = await loop.run_in_executor(None, mb_search_artist, artist)
if mbid:
mb_tags = await loop.run_in_executor(
None, mb_artist_tags, mbid,
)
if mb_tags:
await bot.reply(message, f"{artist}: {', '.join(mb_tags)}")
return
except Exception:
log.warning("lastfm: MusicBrainz tag fallback failed",
exc_info=True)
if not tags:
await bot.reply(message, f"No tags found for '{artist}'")

276
plugins/mumble_admin.py Normal file
View File

@@ -0,0 +1,276 @@
"""Plugin: Mumble server administration via chat commands."""
from __future__ import annotations
import logging
from derp.plugin import command
_log = logging.getLogger(__name__)
# -- Helpers -----------------------------------------------------------------
def _find_user(bot, name: str):
"""Case-insensitive user lookup by name. Returns pymumble User or None."""
mumble = getattr(bot, "_mumble", None)
if mumble is None:
return None
lower = name.lower()
for sid in list(mumble.users):
user = mumble.users[sid]
if user["name"].lower() == lower:
return user
return None
def _find_channel(bot, name: str):
"""Case-insensitive channel lookup by name. Returns pymumble Channel or None."""
mumble = getattr(bot, "_mumble", None)
if mumble is None:
return None
lower = name.lower()
for cid in list(mumble.channels):
chan = mumble.channels[cid]
if chan["name"].lower() == lower:
return chan
return None
def _channel_name(bot, channel_id: int) -> str:
"""Resolve a channel ID to its name, or return the ID as string."""
mumble = getattr(bot, "_mumble", None)
if mumble is None:
return str(channel_id)
chan = mumble.channels.get(channel_id)
if chan is None:
return str(channel_id)
return chan["name"]
# -- Sub-handlers ------------------------------------------------------------
async def _sub_kick(bot, message, args: list[str]) -> None:
if not args:
await bot.reply(message, "Usage: !mu kick <user> [reason]")
return
user = _find_user(bot, args[0])
if user is None:
await bot.reply(message, f"User not found: {args[0]}")
return
reason = " ".join(args[1:]) if len(args) > 1 else ""
user.kick(reason)
await bot.reply(message, f"Kicked {user['name']}")
async def _sub_ban(bot, message, args: list[str]) -> None:
if not args:
await bot.reply(message, "Usage: !mu ban <user> [reason]")
return
user = _find_user(bot, args[0])
if user is None:
await bot.reply(message, f"User not found: {args[0]}")
return
reason = " ".join(args[1:]) if len(args) > 1 else ""
user.ban(reason)
await bot.reply(message, f"Banned {user['name']}")
async def _sub_mute(bot, message, args: list[str]) -> None:
if not args:
await bot.reply(message, "Usage: !mu mute <user>")
return
user = _find_user(bot, args[0])
if user is None:
await bot.reply(message, f"User not found: {args[0]}")
return
user.mute()
await bot.reply(message, f"Muted {user['name']}")
async def _sub_unmute(bot, message, args: list[str]) -> None:
if not args:
await bot.reply(message, "Usage: !mu unmute <user>")
return
user = _find_user(bot, args[0])
if user is None:
await bot.reply(message, f"User not found: {args[0]}")
return
user.unmute()
await bot.reply(message, f"Unmuted {user['name']}")
async def _sub_deafen(bot, message, args: list[str]) -> None:
if not args:
await bot.reply(message, "Usage: !mu deafen <user>")
return
user = _find_user(bot, args[0])
if user is None:
await bot.reply(message, f"User not found: {args[0]}")
return
user.deafen()
await bot.reply(message, f"Deafened {user['name']}")
async def _sub_undeafen(bot, message, args: list[str]) -> None:
if not args:
await bot.reply(message, "Usage: !mu undeafen <user>")
return
user = _find_user(bot, args[0])
if user is None:
await bot.reply(message, f"User not found: {args[0]}")
return
user.undeafen()
await bot.reply(message, f"Undeafened {user['name']}")
async def _sub_move(bot, message, args: list[str]) -> None:
if len(args) < 2:
await bot.reply(message, "Usage: !mu move <user> <channel>")
return
user = _find_user(bot, args[0])
if user is None:
await bot.reply(message, f"User not found: {args[0]}")
return
chan = _find_channel(bot, " ".join(args[1:]))
if chan is None:
await bot.reply(message, f"Channel not found: {' '.join(args[1:])}")
return
user.move_in(chan["channel_id"])
await bot.reply(message, f"Moved {user['name']} to {chan['name']}")
async def _sub_users(bot, message, args: list[str]) -> None:
mumble = getattr(bot, "_mumble", None)
if mumble is None:
return
bots = getattr(bot.registry, "_bots", {})
lines: list[str] = []
for sid in sorted(mumble.users):
user = mumble.users[sid]
name = user["name"]
flags: list[str] = []
if name in bots:
flags.append("bot")
if user.get("mute") or user.get("self_mute"):
flags.append("muted")
if user.get("deaf") or user.get("self_deaf"):
flags.append("deaf")
chan = _channel_name(bot, user.get("channel_id", 0))
tag = f" [{', '.join(flags)}]" if flags else ""
lines.append(f" {name} in {chan}{tag}")
header = f"Online: {len(lines)} user(s)"
await bot.reply(message, header + "\n" + "\n".join(lines))
async def _sub_channels(bot, message, args: list[str]) -> None:
mumble = getattr(bot, "_mumble", None)
if mumble is None:
return
lines: list[str] = []
for cid in sorted(mumble.channels):
chan = mumble.channels[cid]
name = chan["name"]
# Count users in this channel
count = sum(
1 for sid in mumble.users
if mumble.users[sid].get("channel_id") == cid
)
lines.append(f" {name} ({count})")
await bot.reply(message, "Channels:\n" + "\n".join(lines))
async def _sub_mkchan(bot, message, args: list[str]) -> None:
if not args:
await bot.reply(message, "Usage: !mu mkchan <name> [temp]")
return
mumble = getattr(bot, "_mumble", None)
if mumble is None:
return
name = args[0]
temp = len(args) > 1 and args[1].lower() in ("temp", "temporary", "true")
mumble.channels.new_channel(0, name, temporary=temp)
label = " (temporary)" if temp else ""
await bot.reply(message, f"Created channel: {name}{label}")
async def _sub_rmchan(bot, message, args: list[str]) -> None:
if not args:
await bot.reply(message, "Usage: !mu rmchan <channel>")
return
chan = _find_channel(bot, " ".join(args))
if chan is None:
await bot.reply(message, f"Channel not found: {' '.join(args)}")
return
name = chan["name"]
chan.remove()
await bot.reply(message, f"Removed channel: {name}")
async def _sub_rename(bot, message, args: list[str]) -> None:
if len(args) < 2:
await bot.reply(message, "Usage: !mu rename <channel> <new-name>")
return
chan = _find_channel(bot, args[0])
if chan is None:
await bot.reply(message, f"Channel not found: {args[0]}")
return
old = chan["name"]
chan.rename_channel(args[1])
await bot.reply(message, f"Renamed {old} to {args[1]}")
async def _sub_desc(bot, message, args: list[str]) -> None:
if len(args) < 2:
await bot.reply(message, "Usage: !mu desc <channel> <text>")
return
chan = _find_channel(bot, args[0])
if chan is None:
await bot.reply(message, f"Channel not found: {args[0]}")
return
text = " ".join(args[1:])
chan.set_channel_description(text)
await bot.reply(message, f"Set description for {chan['name']}")
# -- Dispatch table ----------------------------------------------------------
_SUBS: dict[str, object] = {
"kick": _sub_kick,
"ban": _sub_ban,
"mute": _sub_mute,
"unmute": _sub_unmute,
"deafen": _sub_deafen,
"undeafen": _sub_undeafen,
"move": _sub_move,
"users": _sub_users,
"channels": _sub_channels,
"mkchan": _sub_mkchan,
"rmchan": _sub_rmchan,
"rename": _sub_rename,
"desc": _sub_desc,
}
_USAGE = (
"Usage: !mu <action> [args]\n"
"Actions: kick, ban, mute, unmute, deafen, undeafen, move, "
"users, channels, mkchan, rmchan, rename, desc"
)
@command("mu", help="Mumble admin: !mu <action> [args]", tier="admin")
async def cmd_mu(bot, message):
"""Mumble server administration commands."""
parts = message.text.split()
if len(parts) < 2:
await bot.reply(message, _USAGE)
return
sub = parts[1].lower()
handler = _SUBS.get(sub)
if handler is None:
await bot.reply(message, _USAGE)
return
await handler(bot, message, parts[2:])

View File

@@ -58,6 +58,8 @@ def _ps(bot):
"history": [],
"autoplay": cfg.get("autoplay", True),
"autoplay_cooldown": cfg.get("autoplay_cooldown", 30),
"discover": cfg.get("discover", True),
"discover_ratio": cfg.get("discover_ratio", 3),
"announce": cfg.get("announce", False),
"paused": None,
"_watcher_task": None,
@@ -575,17 +577,64 @@ async def _play_loop(bot, *, seek: float = 0.0, fade_in: float | bool = True) ->
seek_req = [None]
ps["seek_req"] = seek_req
_autoplay_pool: list[_Track] = [] # shuffled deck, refilled each cycle
_discover_seen: set[str] = set() # "artist:title" dedup within session
_autoplay_count: int = 0 # autoplay picks since loop start
try:
while ps["queue"] or ps.get("autoplay"):
# Autoplay: cooldown + silence wait, then pick next from shuffled deck
if not ps["queue"]:
if not _autoplay_pool:
kept = _load_kept_tracks(bot)
if not kept:
break
random.shuffle(kept)
_autoplay_pool = kept
log.info("music: autoplay shuffled %d kept tracks", len(kept))
_autoplay_count += 1
# -- Discovery attempt on every Nth autoplay pick --
discovered = False
ratio = ps.get("discover_ratio", 3)
if (ps.get("discover") and ratio > 0
and _autoplay_count % ratio == 0
and ps["history"]):
last = ps["history"][-1]
try:
lfm = bot.registry._modules.get("lastfm")
if lfm and hasattr(lfm, "discover_similar"):
pair = await lfm.discover_similar(bot, last.title)
if pair:
a, t = pair
key = f"{a.lower()}:{t.lower()}"
if key not in _discover_seen:
_discover_seen.add(key)
loop = asyncio.get_running_loop()
res = await loop.run_in_executor(
None, _resolve_tracks,
f"{a} {t}", 1,
)
if res:
discovered = True
pick = _Track(
url=res[0][0], title=res[0][1],
requester="discover",
)
log.info(
"music: discovered '%s' "
"similar to '%s'",
pick.title, last.title,
)
except Exception:
log.warning(
"music: discovery failed, using kept deck",
exc_info=True,
)
# -- Kept-deck fallback --
if not discovered:
if not _autoplay_pool:
kept = _load_kept_tracks(bot)
if not kept:
break
random.shuffle(kept)
_autoplay_pool = kept
log.info("music: autoplay shuffled %d kept tracks",
len(kept))
pick = _autoplay_pool.pop(0)
cooldown = ps.get("autoplay_cooldown", 30)
log.info("music: autoplay cooldown %ds before next track",
cooldown)
@@ -602,9 +651,8 @@ async def _play_loop(bot, *, seek: float = 0.0, fade_in: float | bool = True) ->
# Re-check: someone may have queued something or stopped
if ps["queue"]:
continue
pick = _autoplay_pool.pop(0)
ps["queue"].append(pick)
log.info("music: autoplay picked '%s' (%d remaining)",
log.info("music: autoplay queued '%s' (%d pool remaining)",
pick.title, len(_autoplay_pool))
track = ps["queue"].pop(0)
ps["current"] = track
@@ -1194,15 +1242,30 @@ async def cmd_queue(bot, message):
ps = _ps(bot)
lines = []
if ps["current"]:
track = ps["current"]
progress = ps.get("progress")
cur_seek = ps.get("cur_seek", 0.0)
elapsed = cur_seek + (progress[0] * 0.02 if progress else 0.0)
pos = _fmt_time(elapsed)
if track.duration > 0:
pos = f"{pos}/{_fmt_time(track.duration)}"
lines.append(
f"Now: {_truncate(ps['current'].title)}"
f" [{ps['current'].requester}]"
f"Now: {_truncate(track.title)}"
f" [{track.requester}] ({pos})"
)
if ps["queue"]:
total_dur = 0.0
for i, track in enumerate(ps["queue"], 1):
dur = f" ({_fmt_time(track.duration)})" if track.duration > 0 else ""
total_dur += track.duration
lines.append(
f" {i}. {_truncate(track.title)} [{track.requester}]"
f" {i}. {_truncate(track.title)} [{track.requester}]{dur}"
)
count = len(ps["queue"])
footer = f"Queue: {count} track{'s' if count != 1 else ''}"
if total_dur > 0:
footer += f", {_fmt_time(total_dur)} total"
lines.append(footer)
else:
if not ps["current"]:
lines.append("Queue empty")
@@ -1659,6 +1722,184 @@ async def _kept_repair(bot, message) -> None:
await bot.reply(message, msg)
@command("playlist", help="Music: !playlist save|load|list|del <name>")
async def cmd_playlist(bot, message):
"""Save, load, list, delete, import, or show named playlists.
Usage:
!playlist save <name> Save current + queued tracks as a playlist
!playlist load <name> [shuffle] Append saved playlist to queue
!playlist list Show saved playlists with track counts
!playlist del <name> Delete a saved playlist
!playlist import <name> <url> Import tracks from URL as a named playlist
!playlist show <name> Display tracks in a saved playlist
"""
if not _is_mumble(bot):
await bot.reply(message, "Mumble-only feature")
return
parts = message.text.split()
if len(parts) < 2:
await bot.reply(
message, "Usage: !playlist save|load|list|del|import|show <name>",
)
return
sub = parts[1].lower()
if sub == "save":
if len(parts) < 3:
await bot.reply(message, "Usage: !playlist save <name>")
return
name = parts[2].lower()
ps = _ps(bot)
entries = []
if ps["current"]:
t = ps["current"]
entries.append({"url": t.url, "title": t.title,
"requester": t.requester})
for t in ps["queue"]:
entries.append({"url": t.url, "title": t.title,
"requester": t.requester})
if not entries:
await bot.reply(message, "Nothing to save")
return
bot.state.set("music", f"playlist:{name}", json.dumps(entries))
await bot.reply(
message,
f"Saved playlist '{name}' ({len(entries)} track"
f"{'s' if len(entries) != 1 else ''})",
)
elif sub == "load":
if len(parts) < 3:
await bot.reply(message, "Usage: !playlist load <name> [shuffle]")
return
name = parts[2].lower()
shuffle = len(parts) >= 4 and parts[3].lower() == "shuffle"
raw = bot.state.get("music", f"playlist:{name}")
if not raw:
await bot.reply(message, f"No playlist named '{name}'")
return
try:
entries = json.loads(raw)
except (json.JSONDecodeError, TypeError):
await bot.reply(message, f"Corrupt playlist '{name}'")
return
ps = _ps(bot)
was_idle = ps["current"] is None
added = 0
for e in entries:
if len(ps["queue"]) >= _MAX_QUEUE:
break
ps["queue"].append(_Track(
url=e["url"], title=e.get("title", e["url"]),
requester=e.get("requester", "?"),
))
added += 1
if shuffle and ps["queue"]:
random.shuffle(ps["queue"])
suffix = " (shuffled)" if shuffle else ""
await bot.reply(
message,
f"Loaded '{name}': {added} track{'s' if added != 1 else ''}{suffix}",
)
if was_idle:
_ensure_loop(bot)
elif sub == "list":
names = []
for key in bot.state.keys("music"):
if not key.startswith("playlist:"):
continue
pname = key.split(":", 1)[1]
raw = bot.state.get("music", key)
count = 0
if raw:
try:
count = len(json.loads(raw))
except (json.JSONDecodeError, TypeError):
pass
names.append((pname, count))
if not names:
await bot.reply(message, "No saved playlists")
return
names.sort()
lines = [f"Playlists ({len(names)}):"]
for pname, count in names:
lines.append(
f" {pname} ({count} track{'s' if count != 1 else ''})",
)
for line in lines:
await bot.reply(message, line)
elif sub == "del":
if len(parts) < 3:
await bot.reply(message, "Usage: !playlist del <name>")
return
name = parts[2].lower()
raw = bot.state.get("music", f"playlist:{name}")
if not raw:
await bot.reply(message, f"No playlist named '{name}'")
return
bot.state.delete("music", f"playlist:{name}")
await bot.reply(message, f"Deleted playlist '{name}'")
elif sub == "import":
if len(parts) < 4:
await bot.reply(message, "Usage: !playlist import <name> <url>")
return
name = parts[2].lower()
url = parts[3]
await bot.reply(message, f"Importing '{name}' from URL...")
loop = asyncio.get_running_loop()
try:
resolved = await loop.run_in_executor(None, _resolve_tracks, url)
except Exception:
await bot.reply(message, "Failed to resolve URL")
return
if not resolved:
await bot.reply(message, "No tracks found")
return
requester = message.nick or "?"
entries = [{"url": u, "title": t, "requester": requester}
for u, t in resolved]
bot.state.set("music", f"playlist:{name}", json.dumps(entries))
await bot.reply(
message,
f"Imported playlist '{name}' ({len(entries)} track"
f"{'s' if len(entries) != 1 else ''})",
)
elif sub == "show":
if len(parts) < 3:
await bot.reply(message, "Usage: !playlist show <name>")
return
name = parts[2].lower()
raw = bot.state.get("music", f"playlist:{name}")
if not raw:
await bot.reply(message, f"No playlist named '{name}'")
return
try:
entries = json.loads(raw)
except (json.JSONDecodeError, TypeError):
await bot.reply(message, f"Corrupt playlist '{name}'")
return
if not entries:
await bot.reply(message, f"Playlist '{name}' is empty")
return
lines = [f"Playlist '{name}' ({len(entries)} tracks):"]
for i, e in enumerate(entries, 1):
title = _truncate(e.get("title", e["url"]))
lines.append(f" {i:>2}. {title}")
await bot.long_reply(message, lines, label=name)
else:
await bot.reply(
message, "Usage: !playlist save|load|list|del|import|show <name>",
)
# -- Plugin lifecycle --------------------------------------------------------

View File

@@ -47,9 +47,12 @@ _PIPER_URL = "http://192.168.129.9:5100/"
def _ps(bot):
"""Per-bot plugin runtime state."""
cfg = getattr(bot, "config", {}).get("voice", {})
trigger = cfg.get("trigger", "")
# Bias Whisper toward the trigger word unless explicitly configured
default_prompt = f"{trigger.capitalize()}, " if trigger else ""
return bot._pstate.setdefault("voice", {
"listen": False,
"trigger": cfg.get("trigger", ""),
"trigger": trigger,
"buffers": {}, # {username: bytearray}
"last_ts": {}, # {username: float monotonic}
"flush_task": None,
@@ -62,6 +65,7 @@ def _ps(bot):
"noise_scale": cfg.get("noise_scale", 0.667),
"noise_w": cfg.get("noise_w", 0.8),
"fx": cfg.get("fx", ""),
"initial_prompt": cfg.get("initial_prompt", default_prompt),
"_listener_registered": False,
})
@@ -170,8 +174,17 @@ def _transcribe(ps, pcm: bytes) -> str:
).encode() + wav_data + (
f"\r\n--{boundary}\r\n"
f'Content-Disposition: form-data; name="response_format"\r\n\r\n'
f"json\r\n--{boundary}--\r\n"
f"json"
).encode()
# Bias Whisper toward the trigger word when configured
prompt = ps.get("initial_prompt", "")
if prompt:
body += (
f"\r\n--{boundary}\r\n"
f'Content-Disposition: form-data; name="initial_prompt"\r\n\r\n'
f"{prompt}"
).encode()
body += f"\r\n--{boundary}--\r\n".encode()
req = urllib.request.Request(ps["whisper_url"], data=body, method="POST")
req.add_header("Content-Type", f"multipart/form-data; boundary={boundary}")
resp = _urlopen(req, timeout=30, proxy=False)
@@ -552,27 +565,13 @@ async def cmd_audition(bot, message):
f"{_deep},{_bass},{_echo_chamber}"),
]
# Find merlin (the listener bot) -- plays the audition samples
merlin = None
for peer in getattr(bot.registry, "_bots", {}).values():
if getattr(peer, "_receive_sound", False):
merlin = peer
break
await bot.reply(message, f"Auditioning {len(samples)} voice samples...")
loop = asyncio.get_running_loop()
from pathlib import Path
# Pre-generate derp's default voice (same phrase, no FX)
derp_wav = await loop.run_in_executor(
None, lambda: _fetch_tts_voice(piper_url, phrase),
)
for i, (label, voice, sid, fx) in enumerate(samples, 1):
announcer = merlin or bot
await announcer.send("0", f"[{i}/{len(samples)}] {label}")
await bot.send("0", f"[{i}/{len(samples)}] {label}")
await asyncio.sleep(1)
# Generate the audition sample (merlin's candidate voice)
sample_wav = await loop.run_in_executor(
None, lambda v=voice, s=sid, f=fx: _fetch_tts_voice(
piper_url, phrase, voice=v, speaker_id=s,
@@ -583,40 +582,36 @@ async def cmd_audition(bot, message):
await bot.send("0", " (failed)")
continue
try:
# Both bots speak simultaneously:
# merlin plays the audition sample, derp plays its default voice
merlin_done = asyncio.Event()
derp_done = asyncio.Event()
if merlin:
merlin_task = asyncio.create_task(
merlin.stream_audio(sample_wav, volume=1.0,
on_done=merlin_done))
derp_task = asyncio.create_task(
bot.stream_audio(derp_wav, volume=1.0,
on_done=derp_done))
await asyncio.gather(merlin_task, derp_task)
else:
await bot.stream_audio(sample_wav, volume=1.0,
on_done=merlin_done)
await merlin_done.wait()
done = asyncio.Event()
await bot.stream_audio(sample_wav, volume=1.0, on_done=done)
await done.wait()
finally:
Path(sample_wav).unlink(missing_ok=True)
await asyncio.sleep(2)
if derp_wav:
Path(derp_wav).unlink(missing_ok=True)
announcer = merlin or bot
await announcer.send("0", "Audition complete.")
await bot.send("0", "Audition complete.")
# -- Plugin lifecycle --------------------------------------------------------
async def on_connected(bot) -> None:
"""Re-register listener after reconnect."""
"""Re-register listener after reconnect; play TTS greeting on first connect."""
if not _is_mumble(bot):
return
ps = _ps(bot)
if ps["listen"] or ps["trigger"]:
_ensure_listener(bot)
_ensure_flush_task(bot)
# Greet via TTS on first connection only
greet = getattr(bot, "config", {}).get("mumble", {}).get("greet")
if greet and not ps.get("_greeted"):
ps["_greeted"] = True
ready = getattr(bot, "_is_audio_ready", None)
if ready:
for _ in range(20):
if ready():
break
await asyncio.sleep(0.5)
bot._spawn(_tts_play(bot, greet), name="voice-greet")

View File

@@ -253,7 +253,10 @@ class Bot:
async def _loop(self) -> None:
"""Read and dispatch messages until disconnect."""
while self._running:
line = await self.conn.readline()
try:
line = await asyncio.wait_for(self.conn.readline(), timeout=2.0)
except asyncio.TimeoutError:
continue
if line is None:
log.warning("server closed connection")
return

View File

@@ -10,6 +10,7 @@ import sys
from derp import __version__
from derp.bot import Bot
from derp.config import build_server_configs, resolve_config
from derp.irc import format_msg
from derp.log import JsonFormatter
from derp.plugin import PluginRegistry
@@ -37,8 +38,8 @@ def build_parser() -> argparse.ArgumentParser:
"--cprofile",
metavar="PATH",
nargs="?",
const="derp.prof",
help="enable cProfile; dump stats to PATH [derp.prof]",
const="data/derp.prof",
help="enable cProfile; dump stats to PATH [data/derp.prof]",
)
p.add_argument(
"--tracemalloc",
@@ -72,12 +73,24 @@ def _run(bots: list) -> None:
def _shutdown(bots: list) -> None:
"""Signal handler: stop all bot loops so cProfile can flush."""
"""Signal handler: stop all bot loops and tear down connections."""
logging.getLogger("derp").info("SIGTERM received, shutting down")
loop = asyncio.get_running_loop()
for bot in bots:
bot._running = False
if hasattr(bot, "conn"):
asyncio.get_running_loop().create_task(bot.conn.close())
if hasattr(bot, "conn") and bot.conn.connected:
loop.create_task(_quit_and_close(bot))
elif hasattr(bot, "_mumble") and bot._mumble:
bot._mumble.stop()
async def _quit_and_close(bot) -> None:
"""Send IRC QUIT and close the connection."""
try:
await bot.conn.send(format_msg("QUIT", "shutting down"))
except Exception:
pass
await bot.conn.close()
def _dump_tracemalloc(log: logging.Logger, path: str, limit: int = 25) -> None:
@@ -193,10 +206,32 @@ def main(argv: list[str] | None = None) -> int:
if args.cprofile:
import cProfile
import threading
log.info("cProfile enabled, output: %s", args.cprofile)
cProfile.runctx("_run(bots)", globals(), {"bots": bots, "_run": _run}, args.cprofile)
log.info("profile saved to %s", args.cprofile)
prof = cProfile.Profile()
prof_path = args.cprofile
prof_interval = 10 # dump every 10 seconds
prof_stop = threading.Event()
def _periodic_dump():
while not prof_stop.wait(prof_interval):
try:
prof.dump_stats(prof_path)
except Exception:
pass
dumper = threading.Thread(target=_periodic_dump, daemon=True)
dumper.start()
log.info("cProfile enabled, output: %s (saves every %ds)",
prof_path, prof_interval)
prof.enable()
try:
_run(bots)
finally:
prof.disable()
prof_stop.set()
prof.dump_stats(prof_path)
log.info("profile saved to %s", prof_path)
else:
_run(bots)

View File

@@ -218,11 +218,27 @@ class MumbleBot:
self._on_sound_received,
)
self._mumble.set_receive_sound(self._receive_sound)
# Raise retry interval so 2+ bots on the same IP don't trip
# the server's autoban (default: 10 attempts / 120s).
import pymumble_py3.mumble as _pm
if getattr(_pm, "PYMUMBLE_CONNECTION_RETRY_INTERVAL", 0) < 15:
_pm.PYMUMBLE_CONNECTION_RETRY_INTERVAL = 15
self._mumble.start()
self._mumble.is_ready()
def _on_connected(self) -> None:
"""Callback from pymumble thread: connection established."""
# Enable TCP keepalive on the control socket to prevent NAT
# gateways from dropping the mapping during idle periods.
try:
import socket as _sock
raw = self._mumble.control_socket
raw.setsockopt(_sock.SOL_SOCKET, _sock.SO_KEEPALIVE, 1)
raw.setsockopt(_sock.IPPROTO_TCP, _sock.TCP_KEEPIDLE, 10)
raw.setsockopt(_sock.IPPROTO_TCP, _sock.TCP_KEEPINTVL, 5)
raw.setsockopt(_sock.IPPROTO_TCP, _sock.TCP_KEEPCNT, 3)
except Exception:
pass
self._connect_count += 1
kind = "reconnected" if self._connect_count > 1 else "connected"
session = getattr(self._mumble.users, "myself_session", "?")
@@ -238,6 +254,16 @@ class MumbleBot:
self._mumble.users.myself.deafen()
except Exception:
log.exception("mumble: failed to self-deafen on connect")
# Self-register on first connect so the server stores the cert
# and treats this bot as a known user (persistent identity).
if self._connect_count == 1:
try:
myself = self._mumble.users.myself
if not myself.get("user_id"):
myself.register()
log.info("mumble: self-registered %s", self._username)
except Exception:
log.debug("mumble: self-register skipped (already registered?)")
if self._loop:
asyncio.run_coroutine_threadsafe(
self._notify_plugins_connected(), self._loop,
@@ -268,19 +294,7 @@ class MumbleBot:
await self._play_greet()
async def _play_greet(self) -> None:
"""Speak the greeting via TTS on connect (voice only, no text)."""
greet = self.config.get("mumble", {}).get("greet")
if not greet:
return
voice_mod = self.registry._modules.get("voice")
tts_play = getattr(voice_mod, "_tts_play", None) if voice_mod else None
if tts_play is None:
return
for _ in range(20):
if self._is_audio_ready():
break
await asyncio.sleep(0.5)
self._spawn(tts_play(self, greet), name="voice-greet")
"""No-op: greeting is now handled by the voice plugin's on_connected."""
def _on_disconnected(self) -> None:
"""Callback from pymumble thread: connection lost."""
@@ -706,8 +720,8 @@ class MumbleBot:
pass
_get_vol = volume if callable(volume) else lambda: volume
log.info("stream_audio: starting pipeline for %s (vol=%.0f%%, seek=%.1fs)",
url, _get_vol() * 100, seek)
log.info("stream_audio: [%s] starting pipeline for %s (vol=%.0f%%, seek=%.1fs)",
self._username, url, _get_vol() * 100, seek)
def _build_cmd(seek_pos):
seek_flag = f" -ss {seek_pos:.3f}" if seek_pos > 0 else ""
@@ -803,15 +817,17 @@ class MumbleBot:
if not self._is_audio_ready():
# Disconnected -- keep reading ffmpeg at real-time pace
if _was_feeding:
log.warning("stream_audio: connection lost, "
"dropping frames at %d", frames)
log.warning("stream_audio: [%s] connection lost, "
"dropping frames at %d",
self._username, frames)
_was_feeding = False
await asyncio.sleep(0.02)
continue
if not _was_feeding:
log.info("stream_audio: connection restored, "
"resuming feed at frame %d", frames)
log.info("stream_audio: [%s] connection restored, "
"resuming feed at frame %d",
self._username, frames)
_was_feeding = True
# Seek: fade-out in progress
@@ -879,7 +895,8 @@ class MumbleBot:
continue
if frames == 1:
log.info("stream_audio: first frame fed to pymumble")
log.info("stream_audio: [%s] first frame fed to pymumble",
self._username)
# Keep buffer at most 1 second ahead
try:
@@ -896,7 +913,8 @@ class MumbleBot:
await asyncio.sleep(0.1)
except (TypeError, AttributeError):
pass
log.info("stream_audio: finished, %d frames", frames)
log.info("stream_audio: [%s] finished, %d frames",
self._username, frames)
except asyncio.CancelledError:
# Only clear the buffer if volume is still audible -- if a
# fade-out has already driven _cur_vol to ~0 the remaining
@@ -907,11 +925,12 @@ class MumbleBot:
self._mumble.sound_output.clear_buffer()
except Exception:
pass
log.info("stream_audio: cancelled at frame %d (vol=%.3f)",
frames, _cur_vol)
log.info("stream_audio: [%s] cancelled at frame %d (vol=%.3f)",
self._username, frames, _cur_vol)
raise
except Exception:
log.exception("stream_audio: error at frame %d", frames)
log.exception("stream_audio: [%s] error at frame %d",
self._username, frames)
raise
finally:
try:

View File

@@ -420,7 +420,7 @@ class TestExtractVideos:
def close(self):
pass
with patch("urllib.request.urlopen", return_value=FakeResp()):
with patch.object(_mod, "_urlopen", return_value=FakeResp()):
results = _search_youtube("test")
assert len(results) == 1
assert results[0]["id"] == "dup1"
@@ -438,7 +438,7 @@ class TestSearchYoutube:
def close(self):
pass
with patch("urllib.request.urlopen", return_value=FakeResp()):
with patch.object(_mod, "_urlopen", return_value=FakeResp()):
results = _search_youtube("test query")
assert len(results) == 2
assert results[0]["id"] == "abc123"
@@ -446,7 +446,7 @@ class TestSearchYoutube:
def test_http_error_propagates(self):
import pytest
with patch("urllib.request.urlopen", side_effect=ConnectionError("fail")):
with patch.object(_mod, "_urlopen", side_effect=ConnectionError("fail")):
with pytest.raises(ConnectionError):
_search_youtube("test")
@@ -1263,7 +1263,7 @@ class TestSearchSearx:
def close(self):
pass
with patch("urllib.request.urlopen", return_value=FakeResp()):
with patch.object(_mod, "_urlopen", return_value=FakeResp()):
results = _search_searx("test query")
# Same response served for all categories; deduped by URL
assert len(results) == 3
@@ -1281,13 +1281,13 @@ class TestSearchSearx:
def close(self):
pass
with patch("urllib.request.urlopen", return_value=FakeResp()):
with patch.object(_mod, "_urlopen", return_value=FakeResp()):
results = _search_searx("nothing")
assert results == []
def test_http_error_returns_empty(self):
"""SearXNG catches per-category errors; all failing returns empty."""
with patch("urllib.request.urlopen", side_effect=ConnectionError("fail")):
with patch.object(_mod, "_urlopen", side_effect=ConnectionError("fail")):
results = _search_searx("test")
assert results == []

723
tests/test_lastfm.py Normal file
View File

@@ -0,0 +1,723 @@
"""Tests for the Last.fm music discovery plugin."""
import asyncio
import importlib.util
import json
import sys
from dataclasses import dataclass
from unittest.mock import AsyncMock, MagicMock, patch
# -- Load plugin module directly ---------------------------------------------
_spec = importlib.util.spec_from_file_location("lastfm", "plugins/lastfm.py")
_mod = importlib.util.module_from_spec(_spec)
sys.modules["lastfm"] = _mod
_spec.loader.exec_module(_mod)
# -- Fakes -------------------------------------------------------------------
class _FakeRegistry:
def __init__(self):
self._modules: dict = {}
class _FakeBot:
def __init__(self, *, api_key: str = "test-key"):
self.replied: list[str] = []
self.config: dict = {"lastfm": {"api_key": api_key}} if api_key else {}
self._pstate: dict = {}
self.registry = _FakeRegistry()
async def reply(self, message, text: str) -> None:
self.replied.append(text)
async def long_reply(self, message, lines: list[str], *,
label: str = "") -> None:
for line in lines:
self.replied.append(line)
class _Msg:
def __init__(self, text="!similar", nick="Alice", target="0",
is_channel=True):
self.text = text
self.nick = nick
self.target = target
self.is_channel = is_channel
self.prefix = nick
self.command = "PRIVMSG"
self.params = [target, text]
self.tags = {}
self.raw = {}
@dataclass(slots=True)
class _FakeTrack:
url: str = ""
title: str = ""
requester: str = ""
# -- API response fixtures ---------------------------------------------------
SIMILAR_ARTISTS_RESP = {
"similarartists": {
"artist": [
{"name": "Artist B", "match": "0.85"},
{"name": "Artist C", "match": "0.72"},
{"name": "Artist D", "match": "0.60"},
],
},
}
SIMILAR_TRACKS_RESP = {
"similartracks": {
"track": [
{"name": "Track X", "artist": {"name": "Artist B"}, "match": "0.9"},
{"name": "Track Y", "artist": {"name": "Artist C"}, "match": "0.7"},
],
},
}
TOP_TAGS_RESP = {
"toptags": {
"tag": [
{"name": "rock", "count": 100},
{"name": "alternative", "count": 80},
{"name": "indie", "count": 60},
],
},
}
TRACK_SEARCH_RESP = {
"results": {
"trackmatches": {
"track": [
{"name": "Found Track", "artist": "Found Artist"},
{"name": "Another", "artist": "Someone"},
],
},
},
}
# ---------------------------------------------------------------------------
# TestGetApiKey
# ---------------------------------------------------------------------------
class TestGetApiKey:
def test_from_config(self):
bot = _FakeBot(api_key="cfg-key")
assert _mod._get_api_key(bot) == "cfg-key"
def test_from_env(self):
bot = _FakeBot(api_key="")
with patch.dict("os.environ", {"LASTFM_API_KEY": "env-key"}):
assert _mod._get_api_key(bot) == "env-key"
def test_env_takes_priority(self):
bot = _FakeBot(api_key="cfg-key")
with patch.dict("os.environ", {"LASTFM_API_KEY": "env-key"}):
assert _mod._get_api_key(bot) == "env-key"
def test_empty_when_unset(self):
bot = _FakeBot(api_key="")
with patch.dict("os.environ", {}, clear=True):
assert _mod._get_api_key(bot) == ""
# ---------------------------------------------------------------------------
# TestApiCall
# ---------------------------------------------------------------------------
class TestApiCall:
def test_parses_json(self):
resp = MagicMock()
resp.read.return_value = b'{"result": "ok"}'
with patch.object(_mod, "urlopen", create=True, return_value=resp):
# _api_call imports urlopen from derp.http at call time
with patch("derp.http.urlopen", return_value=resp):
data = _mod._api_call("key", "artist.getSimilar", artist="X")
assert data == {"result": "ok"}
def test_returns_empty_on_error(self):
with patch("derp.http.urlopen", side_effect=ConnectionError("fail")):
data = _mod._api_call("key", "artist.getSimilar", artist="X")
assert data == {}
# ---------------------------------------------------------------------------
# TestGetSimilarArtists
# ---------------------------------------------------------------------------
class TestGetSimilarArtists:
def test_returns_list(self):
with patch.object(_mod, "_api_call", return_value=SIMILAR_ARTISTS_RESP):
result = _mod._get_similar_artists("key", "Artist A")
assert len(result) == 3
assert result[0]["name"] == "Artist B"
def test_single_dict_wrapped(self):
"""Single artist result (dict instead of list) gets wrapped."""
data = {"similarartists": {"artist": {"name": "Solo", "match": "1.0"}}}
with patch.object(_mod, "_api_call", return_value=data):
result = _mod._get_similar_artists("key", "X")
assert len(result) == 1
assert result[0]["name"] == "Solo"
def test_empty_response(self):
with patch.object(_mod, "_api_call", return_value={}):
result = _mod._get_similar_artists("key", "X")
assert result == []
def test_missing_key(self):
with patch.object(_mod, "_api_call", return_value={"error": 6}):
result = _mod._get_similar_artists("key", "X")
assert result == []
# ---------------------------------------------------------------------------
# TestGetTopTags
# ---------------------------------------------------------------------------
class TestGetTopTags:
def test_returns_list(self):
with patch.object(_mod, "_api_call", return_value=TOP_TAGS_RESP):
result = _mod._get_top_tags("key", "Artist A")
assert len(result) == 3
assert result[0]["name"] == "rock"
def test_single_dict_wrapped(self):
data = {"toptags": {"tag": {"name": "electronic", "count": 50}}}
with patch.object(_mod, "_api_call", return_value=data):
result = _mod._get_top_tags("key", "X")
assert len(result) == 1
def test_empty_response(self):
with patch.object(_mod, "_api_call", return_value={}):
result = _mod._get_top_tags("key", "X")
assert result == []
# ---------------------------------------------------------------------------
# TestGetSimilarTracks
# ---------------------------------------------------------------------------
class TestGetSimilarTracks:
def test_returns_list(self):
with patch.object(_mod, "_api_call", return_value=SIMILAR_TRACKS_RESP):
result = _mod._get_similar_tracks("key", "A", "T")
assert len(result) == 2
assert result[0]["name"] == "Track X"
def test_single_dict_wrapped(self):
data = {"similartracks": {"track": {"name": "Solo", "artist": {"name": "X"}}}}
with patch.object(_mod, "_api_call", return_value=data):
result = _mod._get_similar_tracks("key", "X", "T")
assert len(result) == 1
def test_empty_response(self):
with patch.object(_mod, "_api_call", return_value={}):
result = _mod._get_similar_tracks("key", "X", "T")
assert result == []
# ---------------------------------------------------------------------------
# TestSearchTrack
# ---------------------------------------------------------------------------
class TestSearchTrack:
def test_returns_results(self):
with patch.object(_mod, "_api_call", return_value=TRACK_SEARCH_RESP):
result = _mod._search_track("key", "test")
assert len(result) == 2
assert result[0]["name"] == "Found Track"
def test_single_dict_wrapped(self):
data = {"results": {"trackmatches": {
"track": {"name": "One", "artist": "X"},
}}}
with patch.object(_mod, "_api_call", return_value=data):
result = _mod._search_track("key", "test")
assert len(result) == 1
def test_empty_response(self):
with patch.object(_mod, "_api_call", return_value={}):
result = _mod._search_track("key", "test")
assert result == []
# ---------------------------------------------------------------------------
# TestCurrentMeta
# ---------------------------------------------------------------------------
class TestCurrentMeta:
def test_no_music_state(self):
bot = _FakeBot()
assert _mod._current_meta(bot) == ("", "")
def test_no_current_track(self):
bot = _FakeBot()
bot._pstate["music"] = {"current": None}
assert _mod._current_meta(bot) == ("", "")
def test_dash_separator(self):
bot = _FakeBot()
bot._pstate["music"] = {"current": _FakeTrack(title="Tool - Lateralus")}
assert _mod._current_meta(bot) == ("Tool", "Lateralus")
def test_double_dash_separator(self):
bot = _FakeBot()
bot._pstate["music"] = {"current": _FakeTrack(title="Tool -- Lateralus")}
assert _mod._current_meta(bot) == ("Tool", "Lateralus")
def test_pipe_separator(self):
bot = _FakeBot()
bot._pstate["music"] = {"current": _FakeTrack(title="Tool | Lateralus")}
assert _mod._current_meta(bot) == ("Tool", "Lateralus")
def test_tilde_separator(self):
bot = _FakeBot()
bot._pstate["music"] = {"current": _FakeTrack(title="Tool ~ Lateralus")}
assert _mod._current_meta(bot) == ("Tool", "Lateralus")
def test_no_separator(self):
bot = _FakeBot()
bot._pstate["music"] = {"current": _FakeTrack(title="Lateralus")}
assert _mod._current_meta(bot) == ("", "Lateralus")
def test_empty_title(self):
bot = _FakeBot()
bot._pstate["music"] = {"current": _FakeTrack(title="")}
assert _mod._current_meta(bot) == ("", "")
def test_strips_whitespace(self):
bot = _FakeBot()
bot._pstate["music"] = {
"current": _FakeTrack(title=" Tool - Lateralus "),
}
assert _mod._current_meta(bot) == ("Tool", "Lateralus")
# ---------------------------------------------------------------------------
# TestFmtMatch
# ---------------------------------------------------------------------------
class TestFmtMatch:
def test_float_score(self):
assert _mod._fmt_match(0.85) == "85%"
def test_string_score(self):
assert _mod._fmt_match("0.72") == "72%"
def test_one(self):
assert _mod._fmt_match(1.0) == "100%"
def test_zero(self):
assert _mod._fmt_match(0.0) == "0%"
def test_invalid(self):
assert _mod._fmt_match("bad") == ""
def test_none(self):
assert _mod._fmt_match(None) == ""
# ---------------------------------------------------------------------------
# TestCmdSimilar
# ---------------------------------------------------------------------------
class TestCmdSimilar:
def test_no_api_key_mb_fallback(self):
"""No API key falls back to MusicBrainz for similar results."""
bot = _FakeBot(api_key="")
msg = _Msg(text="!similar Tool")
mb_picks = [{"artist": "MB Artist", "title": "MB Song"}]
with patch.dict("os.environ", {}, clear=True), \
patch("plugins._musicbrainz.mb_search_artist",
return_value="mbid-123"), \
patch("plugins._musicbrainz.mb_artist_tags",
return_value=["rock", "metal"]), \
patch("plugins._musicbrainz.mb_find_similar_recordings",
return_value=mb_picks):
asyncio.run(_mod.cmd_similar(bot, msg))
assert any("Similar to Tool" in r for r in bot.replied)
assert any("MB Artist" in r for r in bot.replied)
def test_no_api_key_mb_no_results(self):
"""No API key + MusicBrainz returns nothing shows 'no similar'."""
bot = _FakeBot(api_key="")
msg = _Msg(text="!similar Tool")
with patch.dict("os.environ", {}, clear=True), \
patch("plugins._musicbrainz.mb_search_artist",
return_value=None):
asyncio.run(_mod.cmd_similar(bot, msg))
assert any("No similar artists" in r for r in bot.replied)
def test_no_api_key_play_mode(self):
"""No API key + play mode delegates to cmd_play via MB results."""
bot = _FakeBot(api_key="")
msg = _Msg(text="!similar play Tool")
mb_picks = [{"artist": "MB Band", "title": "MB Track"}]
play_called = []
async def fake_play(b, m):
play_called.append(m.text)
music_mod = MagicMock()
music_mod.cmd_play = fake_play
bot.registry._modules["music"] = music_mod
with patch.dict("os.environ", {}, clear=True), \
patch("plugins._musicbrainz.mb_search_artist",
return_value="mbid-123"), \
patch("plugins._musicbrainz.mb_artist_tags",
return_value=["rock"]), \
patch("plugins._musicbrainz.mb_find_similar_recordings",
return_value=mb_picks):
asyncio.run(_mod.cmd_similar(bot, msg))
assert len(play_called) == 1
assert "MB Band" in play_called[0]
def test_no_artist_nothing_playing(self):
bot = _FakeBot()
msg = _Msg(text="!similar")
asyncio.run(_mod.cmd_similar(bot, msg))
assert any("Nothing playing" in r for r in bot.replied)
def test_artist_query_shows_similar(self):
bot = _FakeBot()
msg = _Msg(text="!similar Tool")
with patch.object(_mod, "_get_similar_tracks", return_value=[]):
with patch.object(_mod, "_get_similar_artists",
return_value=SIMILAR_ARTISTS_RESP["similarartists"]["artist"]):
asyncio.run(_mod.cmd_similar(bot, msg))
assert any("Similar to Tool" in r for r in bot.replied)
assert any("Artist B" in r for r in bot.replied)
def test_track_level_similarity(self):
"""When current track has artist + title, tries track similarity first."""
bot = _FakeBot()
bot._pstate["music"] = {
"current": _FakeTrack(title="Tool - Lateralus"),
}
msg = _Msg(text="!similar")
tracks = SIMILAR_TRACKS_RESP["similartracks"]["track"]
with patch.object(_mod, "_get_similar_tracks", return_value=tracks):
asyncio.run(_mod.cmd_similar(bot, msg))
assert any("Similar to Tool - Lateralus" in r for r in bot.replied)
assert any("Track X" in r for r in bot.replied)
def test_falls_back_to_artist(self):
"""Falls back to artist similarity when no track results."""
bot = _FakeBot()
bot._pstate["music"] = {
"current": _FakeTrack(title="Tool - Lateralus"),
}
msg = _Msg(text="!similar")
artists = SIMILAR_ARTISTS_RESP["similarartists"]["artist"]
with patch.object(_mod, "_get_similar_tracks", return_value=[]):
with patch.object(_mod, "_get_similar_artists", return_value=artists):
asyncio.run(_mod.cmd_similar(bot, msg))
assert any("Similar to Tool" in r for r in bot.replied)
def test_no_similar_found(self):
bot = _FakeBot()
msg = _Msg(text="!similar Obscure Band")
with patch.object(_mod, "_get_similar_tracks", return_value=[]):
with patch.object(_mod, "_get_similar_artists", return_value=[]):
asyncio.run(_mod.cmd_similar(bot, msg))
assert any("No similar artists" in r for r in bot.replied)
def test_play_mode_artist(self):
"""!similar play delegates to music cmd_play."""
bot = _FakeBot()
msg = _Msg(text="!similar play Tool")
artists = [{"name": "Deftones", "match": "0.8"}]
play_called = []
async def fake_play(b, m):
play_called.append(m.text)
music_mod = MagicMock()
music_mod.cmd_play = fake_play
bot.registry._modules["music"] = music_mod
with patch.object(_mod, "_get_similar_tracks", return_value=[]):
with patch.object(_mod, "_get_similar_artists", return_value=artists):
asyncio.run(_mod.cmd_similar(bot, msg))
assert len(play_called) == 1
assert "Deftones" in play_called[0]
def test_play_mode_track(self):
"""!similar play with track-level results delegates to cmd_play."""
bot = _FakeBot()
bot._pstate["music"] = {
"current": _FakeTrack(title="Tool - Lateralus"),
}
msg = _Msg(text="!similar play")
tracks = [{"name": "Schism", "artist": {"name": "Tool"}, "match": "0.9"}]
play_called = []
async def fake_play(b, m):
play_called.append(m.text)
music_mod = MagicMock()
music_mod.cmd_play = fake_play
bot.registry._modules["music"] = music_mod
with patch.object(_mod, "_get_similar_tracks", return_value=tracks):
asyncio.run(_mod.cmd_similar(bot, msg))
assert len(play_called) == 1
assert "Tool" in play_called[0]
assert "Schism" in play_called[0]
def test_match_score_displayed(self):
bot = _FakeBot()
msg = _Msg(text="!similar Tool")
artists = [{"name": "Deftones", "match": "0.85"}]
with patch.object(_mod, "_get_similar_tracks", return_value=[]):
with patch.object(_mod, "_get_similar_artists", return_value=artists):
asyncio.run(_mod.cmd_similar(bot, msg))
assert any("85%" in r for r in bot.replied)
def test_current_track_no_separator(self):
"""Title without separator uses whole title as search artist."""
bot = _FakeBot()
bot._pstate["music"] = {
"current": _FakeTrack(title="Lateralus"),
}
msg = _Msg(text="!similar")
artists = [{"name": "APC", "match": "0.7"}]
with patch.object(_mod, "_get_similar_tracks", return_value=[]):
with patch.object(_mod, "_get_similar_artists", return_value=artists):
asyncio.run(_mod.cmd_similar(bot, msg))
assert any("Similar to Lateralus" in r for r in bot.replied)
# ---------------------------------------------------------------------------
# TestCmdTags
# ---------------------------------------------------------------------------
class TestCmdTags:
def test_no_api_key_mb_fallback(self):
"""No API key falls back to MusicBrainz for tags."""
bot = _FakeBot(api_key="")
msg = _Msg(text="!tags Tool")
with patch.dict("os.environ", {}, clear=True), \
patch("plugins._musicbrainz.mb_search_artist",
return_value="mbid-123"), \
patch("plugins._musicbrainz.mb_artist_tags",
return_value=["rock", "progressive metal", "art rock"]):
asyncio.run(_mod.cmd_tags(bot, msg))
assert any("Tool:" in r for r in bot.replied)
assert any("rock" in r for r in bot.replied)
assert any("progressive metal" in r for r in bot.replied)
def test_no_api_key_mb_no_results(self):
"""No API key + MusicBrainz returns nothing shows 'no tags'."""
bot = _FakeBot(api_key="")
msg = _Msg(text="!tags Obscure")
with patch.dict("os.environ", {}, clear=True), \
patch("plugins._musicbrainz.mb_search_artist",
return_value=None):
asyncio.run(_mod.cmd_tags(bot, msg))
assert any("No tags found" in r for r in bot.replied)
def test_no_artist_nothing_playing(self):
bot = _FakeBot()
msg = _Msg(text="!tags")
asyncio.run(_mod.cmd_tags(bot, msg))
assert any("Nothing playing" in r for r in bot.replied)
def test_shows_tags(self):
bot = _FakeBot()
msg = _Msg(text="!tags Tool")
tags = TOP_TAGS_RESP["toptags"]["tag"]
with patch.object(_mod, "_get_top_tags", return_value=tags):
asyncio.run(_mod.cmd_tags(bot, msg))
assert any("rock" in r for r in bot.replied)
assert any("alternative" in r for r in bot.replied)
assert any("Tool:" in r for r in bot.replied)
def test_no_tags_found(self):
bot = _FakeBot()
msg = _Msg(text="!tags Obscure")
with patch.object(_mod, "_get_top_tags", return_value=[]):
asyncio.run(_mod.cmd_tags(bot, msg))
assert any("No tags found" in r for r in bot.replied)
def test_from_current_track(self):
bot = _FakeBot()
bot._pstate["music"] = {
"current": _FakeTrack(title="Tool - Lateralus"),
}
msg = _Msg(text="!tags")
tags = [{"name": "prog metal", "count": 100}]
with patch.object(_mod, "_get_top_tags", return_value=tags):
asyncio.run(_mod.cmd_tags(bot, msg))
assert any("Tool:" in r for r in bot.replied)
assert any("prog metal" in r for r in bot.replied)
def test_from_current_no_separator(self):
"""Uses full title as artist when no separator."""
bot = _FakeBot()
bot._pstate["music"] = {
"current": _FakeTrack(title="Lateralus"),
}
msg = _Msg(text="!tags")
tags = [{"name": "rock", "count": 50}]
with patch.object(_mod, "_get_top_tags", return_value=tags):
asyncio.run(_mod.cmd_tags(bot, msg))
assert any("Lateralus:" in r for r in bot.replied)
# ---------------------------------------------------------------------------
# TestParseTitle
# ---------------------------------------------------------------------------
class TestParseTitle:
def test_dash_separator(self):
assert _mod._parse_title("Tool - Lateralus") == ("Tool", "Lateralus")
def test_double_dash(self):
assert _mod._parse_title("Tool -- Lateralus") == ("Tool", "Lateralus")
def test_pipe_separator(self):
assert _mod._parse_title("Tool | Lateralus") == ("Tool", "Lateralus")
def test_tilde_separator(self):
assert _mod._parse_title("Tool ~ Lateralus") == ("Tool", "Lateralus")
def test_no_separator(self):
assert _mod._parse_title("Lateralus") == ("", "Lateralus")
def test_empty_string(self):
assert _mod._parse_title("") == ("", "")
def test_strips_whitespace(self):
assert _mod._parse_title(" Tool - Lateralus ") == ("Tool", "Lateralus")
def test_first_separator_wins(self):
"""Only the first matching separator is used."""
assert _mod._parse_title("A - B - C") == ("A", "B - C")
def test_dash_priority_over_pipe(self):
"""Dash separator is tried before pipe."""
assert _mod._parse_title("A - B | C") == ("A", "B | C")
# ---------------------------------------------------------------------------
# TestDiscoverSimilar
# ---------------------------------------------------------------------------
class TestDiscoverSimilar:
def test_lastfm_path(self):
"""Returns Last.fm result when API key + results available."""
bot = _FakeBot(api_key="test-key")
tracks = [{"name": "Found", "artist": {"name": "Band"}, "match": "0.9"}]
with patch.object(_mod, "_get_similar_tracks", return_value=tracks):
result = asyncio.run(
_mod.discover_similar(bot, "Tool - Lateralus"),
)
assert result == ("Band", "Found")
def test_lastfm_empty_falls_to_musicbrainz(self):
"""Falls back to MusicBrainz when Last.fm returns nothing."""
bot = _FakeBot(api_key="test-key")
mb_picks = [{"artist": "MB Artist", "title": "MB Song"}]
with patch.object(_mod, "_get_similar_tracks", return_value=[]), \
patch("plugins._musicbrainz.mb_search_artist", return_value="mbid"), \
patch("plugins._musicbrainz.mb_artist_tags", return_value=["rock"]), \
patch("plugins._musicbrainz.mb_find_similar_recordings",
return_value=mb_picks):
result = asyncio.run(
_mod.discover_similar(bot, "Tool - Lateralus"),
)
assert result == ("MB Artist", "MB Song")
def test_no_api_key_uses_musicbrainz(self):
"""Skips Last.fm when no API key, goes straight to MusicBrainz."""
bot = _FakeBot(api_key="")
mb_picks = [{"artist": "MB Band", "title": "MB Track"}]
with patch.dict("os.environ", {}, clear=True), \
patch("plugins._musicbrainz.mb_search_artist", return_value="mbid"), \
patch("plugins._musicbrainz.mb_artist_tags", return_value=["rock"]), \
patch("plugins._musicbrainz.mb_find_similar_recordings",
return_value=mb_picks):
result = asyncio.run(
_mod.discover_similar(bot, "Tool - Lateralus"),
)
assert result == ("MB Band", "MB Track")
def test_both_fail_returns_none(self):
"""Returns None when both Last.fm and MusicBrainz fail."""
bot = _FakeBot(api_key="test-key")
with patch.object(_mod, "_get_similar_tracks", return_value=[]), \
patch("plugins._musicbrainz.mb_search_artist", return_value=None):
result = asyncio.run(
_mod.discover_similar(bot, "Tool - Lateralus"),
)
assert result is None
def test_no_artist_returns_none(self):
"""Returns None when title has no artist component."""
bot = _FakeBot(api_key="test-key")
result = asyncio.run(
_mod.discover_similar(bot, "Lateralus"),
)
assert result is None
def test_musicbrainz_import_error_handled(self):
"""Gracefully handles import error for _musicbrainz module."""
bot = _FakeBot(api_key="test-key")
with patch.object(_mod, "_get_similar_tracks", return_value=[]), \
patch.dict("sys.modules", {"plugins._musicbrainz": None}):
result = asyncio.run(
_mod.discover_similar(bot, "Tool - Lateralus"),
)
assert result is None
def test_lastfm_exception_falls_to_musicbrainz(self):
"""Last.fm exception triggers MusicBrainz fallback."""
bot = _FakeBot(api_key="test-key")
mb_picks = [{"artist": "Fallback", "title": "Song"}]
with patch.object(_mod, "_get_similar_tracks",
side_effect=Exception("API down")), \
patch("plugins._musicbrainz.mb_search_artist", return_value="mbid"), \
patch("plugins._musicbrainz.mb_artist_tags", return_value=["rock"]), \
patch("plugins._musicbrainz.mb_find_similar_recordings",
return_value=mb_picks):
result = asyncio.run(
_mod.discover_similar(bot, "Tool - Lateralus"),
)
assert result == ("Fallback", "Song")
def test_lastfm_pick_missing_name_falls_to_musicbrainz(self):
"""Falls to MB when Last.fm result has empty artist/title."""
bot = _FakeBot(api_key="test-key")
tracks = [{"name": "", "artist": {"name": ""}, "match": "0.9"}]
mb_picks = [{"artist": "MB", "title": "Track"}]
with patch.object(_mod, "_get_similar_tracks", return_value=tracks), \
patch("plugins._musicbrainz.mb_search_artist", return_value="mbid"), \
patch("plugins._musicbrainz.mb_artist_tags", return_value=["rock"]), \
patch("plugins._musicbrainz.mb_find_similar_recordings",
return_value=mb_picks):
result = asyncio.run(
_mod.discover_similar(bot, "Tool - Lateralus"),
)
assert result == ("MB", "Track")

498
tests/test_mumble_admin.py Normal file
View File

@@ -0,0 +1,498 @@
"""Tests for the mumble_admin plugin."""
import asyncio
import importlib.util
from dataclasses import dataclass, field
from unittest.mock import MagicMock
# -- Load plugin module directly ---------------------------------------------
_spec = importlib.util.spec_from_file_location(
"mumble_admin", "plugins/mumble_admin.py",
)
_mod = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(_mod)
cmd_mu = _mod.cmd_mu
_find_user = _mod._find_user
_find_channel = _mod._find_channel
_channel_name = _mod._channel_name
# -- Fakes -------------------------------------------------------------------
@dataclass
class _FakeMessage:
text: str = ""
nick: str = "admin"
prefix: str = "admin"
target: str = "0"
is_channel: bool = True
params: list[str] = field(default_factory=list)
class _FakeRegistry:
_bots: dict = field(default_factory=dict)
def __init__(self):
self._bots = {}
class _FakeBot:
def __init__(self, users=None, channels=None):
self.registry = _FakeRegistry()
self._mumble = MagicMock()
if users is not None:
self._mumble.users = users
else:
self._mumble.users = {}
if channels is not None:
self._mumble.channels = channels
self._replies: list[str] = []
async def reply(self, message, text):
self._replies.append(text)
async def send(self, target, text):
self._replies.append(text)
def _make_user(name, channel_id=0, mute=False, deaf=False,
self_mute=False, self_deaf=False):
"""Create a fake pymumble user (dict with methods)."""
u = MagicMock()
u.__getitem__ = lambda s, k: {
"name": name,
"channel_id": channel_id,
"mute": mute,
"deaf": deaf,
"self_mute": self_mute,
"self_deaf": self_deaf,
}[k]
u.get = lambda k, d=None: {
"name": name,
"channel_id": channel_id,
"mute": mute,
"deaf": deaf,
"self_mute": self_mute,
"self_deaf": self_deaf,
}.get(k, d)
return u
def _make_channel(name, channel_id=0, parent=0):
"""Create a fake pymumble channel (dict with methods)."""
c = MagicMock()
c.__getitem__ = lambda s, k: {
"name": name,
"channel_id": channel_id,
"parent": parent,
}[k]
c.get = lambda k, d=None: {
"name": name,
"channel_id": channel_id,
"parent": parent,
}.get(k, d)
return c
# -- TestFindUser ------------------------------------------------------------
class TestFindUser:
def test_case_insensitive(self):
alice = _make_user("Alice")
bot = _FakeBot(users={1: alice})
assert _find_user(bot, "alice") is alice
assert _find_user(bot, "ALICE") is alice
assert _find_user(bot, "Alice") is alice
def test_not_found(self):
bot = _FakeBot(users={1: _make_user("Alice")})
assert _find_user(bot, "Bob") is None
def test_no_mumble(self):
bot = _FakeBot()
bot._mumble = None
assert _find_user(bot, "anyone") is None
# -- TestFindChannel ---------------------------------------------------------
class TestFindChannel:
def test_case_insensitive(self):
lobby = _make_channel("Lobby", channel_id=0)
bot = _FakeBot(channels={0: lobby})
assert _find_channel(bot, "lobby") is lobby
assert _find_channel(bot, "LOBBY") is lobby
def test_not_found(self):
bot = _FakeBot(channels={0: _make_channel("Lobby")})
assert _find_channel(bot, "AFK") is None
def test_no_mumble(self):
bot = _FakeBot()
bot._mumble = None
assert _find_channel(bot, "any") is None
# -- TestChannelName ---------------------------------------------------------
class TestChannelName:
def test_resolves(self):
lobby = _make_channel("Lobby", channel_id=0)
bot = _FakeBot(channels={0: lobby})
assert _channel_name(bot, 0) == "Lobby"
def test_missing_returns_id(self):
bot = _FakeBot(channels={})
assert _channel_name(bot, 42) == "42"
def test_no_mumble(self):
bot = _FakeBot()
bot._mumble = None
assert _channel_name(bot, 5) == "5"
# -- TestDispatch ------------------------------------------------------------
class TestDispatch:
def test_no_args_shows_usage(self):
bot = _FakeBot()
msg = _FakeMessage(text="!mu")
asyncio.run(cmd_mu(bot, msg))
assert len(bot._replies) == 1
assert "Usage" in bot._replies[0]
def test_unknown_sub_shows_usage(self):
bot = _FakeBot()
msg = _FakeMessage(text="!mu bogus")
asyncio.run(cmd_mu(bot, msg))
assert "Usage" in bot._replies[0]
def test_valid_sub_routes(self):
alice = _make_user("Alice")
bot = _FakeBot(users={1: alice})
msg = _FakeMessage(text="!mu kick Alice")
asyncio.run(cmd_mu(bot, msg))
alice.kick.assert_called_once_with("")
assert "Kicked" in bot._replies[0]
# -- TestKick ----------------------------------------------------------------
class TestKick:
def test_kick_user(self):
alice = _make_user("Alice")
bot = _FakeBot(users={1: alice})
msg = _FakeMessage(text="!mu kick Alice")
asyncio.run(cmd_mu(bot, msg))
alice.kick.assert_called_once_with("")
assert "Kicked Alice" in bot._replies[0]
def test_kick_with_reason(self):
alice = _make_user("Alice")
bot = _FakeBot(users={1: alice})
msg = _FakeMessage(text="!mu kick Alice being rude")
asyncio.run(cmd_mu(bot, msg))
alice.kick.assert_called_once_with("being rude")
def test_kick_user_not_found(self):
bot = _FakeBot(users={})
msg = _FakeMessage(text="!mu kick Ghost")
asyncio.run(cmd_mu(bot, msg))
assert "not found" in bot._replies[0].lower()
def test_kick_no_args(self):
bot = _FakeBot()
msg = _FakeMessage(text="!mu kick")
asyncio.run(cmd_mu(bot, msg))
assert "Usage" in bot._replies[0]
# -- TestBan -----------------------------------------------------------------
class TestBan:
def test_ban_user(self):
bob = _make_user("Bob")
bot = _FakeBot(users={1: bob})
msg = _FakeMessage(text="!mu ban Bob")
asyncio.run(cmd_mu(bot, msg))
bob.ban.assert_called_once_with("")
assert "Banned Bob" in bot._replies[0]
def test_ban_with_reason(self):
bob = _make_user("Bob")
bot = _FakeBot(users={1: bob})
msg = _FakeMessage(text="!mu ban Bob spamming")
asyncio.run(cmd_mu(bot, msg))
bob.ban.assert_called_once_with("spamming")
def test_ban_user_not_found(self):
bot = _FakeBot(users={})
msg = _FakeMessage(text="!mu ban Ghost")
asyncio.run(cmd_mu(bot, msg))
assert "not found" in bot._replies[0].lower()
# -- TestMuteUnmute ----------------------------------------------------------
class TestMuteUnmute:
def test_mute(self):
alice = _make_user("Alice")
bot = _FakeBot(users={1: alice})
msg = _FakeMessage(text="!mu mute Alice")
asyncio.run(cmd_mu(bot, msg))
alice.mute.assert_called_once()
assert "Muted" in bot._replies[0]
def test_unmute(self):
alice = _make_user("Alice", mute=True)
bot = _FakeBot(users={1: alice})
msg = _FakeMessage(text="!mu unmute Alice")
asyncio.run(cmd_mu(bot, msg))
alice.unmute.assert_called_once()
assert "Unmuted" in bot._replies[0]
def test_mute_not_found(self):
bot = _FakeBot(users={})
msg = _FakeMessage(text="!mu mute Nobody")
asyncio.run(cmd_mu(bot, msg))
assert "not found" in bot._replies[0].lower()
def test_unmute_not_found(self):
bot = _FakeBot(users={})
msg = _FakeMessage(text="!mu unmute Nobody")
asyncio.run(cmd_mu(bot, msg))
assert "not found" in bot._replies[0].lower()
# -- TestDeafenUndeafen ------------------------------------------------------
class TestDeafenUndeafen:
def test_deafen(self):
alice = _make_user("Alice")
bot = _FakeBot(users={1: alice})
msg = _FakeMessage(text="!mu deafen Alice")
asyncio.run(cmd_mu(bot, msg))
alice.deafen.assert_called_once()
assert "Deafened" in bot._replies[0]
def test_undeafen(self):
alice = _make_user("Alice", deaf=True)
bot = _FakeBot(users={1: alice})
msg = _FakeMessage(text="!mu undeafen Alice")
asyncio.run(cmd_mu(bot, msg))
alice.undeafen.assert_called_once()
assert "Undeafened" in bot._replies[0]
def test_deafen_not_found(self):
bot = _FakeBot(users={})
msg = _FakeMessage(text="!mu deafen Nobody")
asyncio.run(cmd_mu(bot, msg))
assert "not found" in bot._replies[0].lower()
def test_undeafen_not_found(self):
bot = _FakeBot(users={})
msg = _FakeMessage(text="!mu undeafen Nobody")
asyncio.run(cmd_mu(bot, msg))
assert "not found" in bot._replies[0].lower()
# -- TestMove ----------------------------------------------------------------
class TestMove:
def test_move_user(self):
alice = _make_user("Alice", channel_id=0)
afk = _make_channel("AFK", channel_id=5)
bot = _FakeBot(users={1: alice}, channels={0: _make_channel("Root"), 5: afk})
msg = _FakeMessage(text="!mu move Alice AFK")
asyncio.run(cmd_mu(bot, msg))
alice.move_in.assert_called_once_with(5)
assert "Moved Alice to AFK" in bot._replies[0]
def test_move_user_not_found(self):
bot = _FakeBot(users={}, channels={5: _make_channel("AFK", channel_id=5)})
msg = _FakeMessage(text="!mu move Ghost AFK")
asyncio.run(cmd_mu(bot, msg))
assert "user not found" in bot._replies[0].lower()
def test_move_channel_not_found(self):
alice = _make_user("Alice")
bot = _FakeBot(users={1: alice}, channels={})
msg = _FakeMessage(text="!mu move Alice Nowhere")
asyncio.run(cmd_mu(bot, msg))
assert "channel not found" in bot._replies[0].lower()
def test_move_missing_args(self):
bot = _FakeBot()
msg = _FakeMessage(text="!mu move Alice")
asyncio.run(cmd_mu(bot, msg))
assert "Usage" in bot._replies[0]
# -- TestUsers ---------------------------------------------------------------
class TestUsers:
def test_list_users(self):
alice = _make_user("Alice", channel_id=0)
bob = _make_user("Bob", channel_id=0, self_mute=True)
lobby = _make_channel("Lobby", channel_id=0)
bot = _FakeBot(users={1: alice, 2: bob}, channels={0: lobby})
msg = _FakeMessage(text="!mu users")
asyncio.run(cmd_mu(bot, msg))
reply = bot._replies[0]
assert "2 user(s)" in reply
assert "Alice" in reply
assert "Bob" in reply
assert "muted" in reply
def test_list_with_bots(self):
alice = _make_user("Alice", channel_id=0)
derp = _make_user("derp", channel_id=0)
lobby = _make_channel("Lobby", channel_id=0)
bot = _FakeBot(users={1: alice, 2: derp}, channels={0: lobby})
bot.registry._bots = {"derp": MagicMock()}
msg = _FakeMessage(text="!mu users")
asyncio.run(cmd_mu(bot, msg))
reply = bot._replies[0]
assert "bot" in reply
assert "2 user(s)" in reply
def test_deaf_flag(self):
alice = _make_user("Alice", channel_id=0, self_deaf=True)
lobby = _make_channel("Lobby", channel_id=0)
bot = _FakeBot(users={1: alice}, channels={0: lobby})
msg = _FakeMessage(text="!mu users")
asyncio.run(cmd_mu(bot, msg))
assert "deaf" in bot._replies[0]
# -- TestChannels ------------------------------------------------------------
class TestChannels:
def test_list_channels(self):
lobby = _make_channel("Lobby", channel_id=0)
afk = _make_channel("AFK", channel_id=1)
alice = _make_user("Alice", channel_id=0)
bot = _FakeBot(users={1: alice}, channels={0: lobby, 1: afk})
msg = _FakeMessage(text="!mu channels")
asyncio.run(cmd_mu(bot, msg))
reply = bot._replies[0]
assert "Lobby (1)" in reply
assert "AFK (0)" in reply
# -- TestMkchan --------------------------------------------------------------
class TestMkchan:
def test_create_channel(self):
bot = _FakeBot()
msg = _FakeMessage(text="!mu mkchan Gaming")
asyncio.run(cmd_mu(bot, msg))
bot._mumble.channels.new_channel.assert_called_once_with(
0, "Gaming", temporary=False,
)
assert "Created" in bot._replies[0]
def test_create_temp_channel(self):
bot = _FakeBot()
msg = _FakeMessage(text="!mu mkchan Gaming temp")
asyncio.run(cmd_mu(bot, msg))
bot._mumble.channels.new_channel.assert_called_once_with(
0, "Gaming", temporary=True,
)
assert "temporary" in bot._replies[0]
def test_missing_name(self):
bot = _FakeBot()
msg = _FakeMessage(text="!mu mkchan")
asyncio.run(cmd_mu(bot, msg))
assert "Usage" in bot._replies[0]
# -- TestRmchan --------------------------------------------------------------
class TestRmchan:
def test_remove_channel(self):
afk = _make_channel("AFK", channel_id=5)
bot = _FakeBot(channels={5: afk})
msg = _FakeMessage(text="!mu rmchan AFK")
asyncio.run(cmd_mu(bot, msg))
afk.remove.assert_called_once()
assert "Removed" in bot._replies[0]
def test_channel_not_found(self):
bot = _FakeBot(channels={})
msg = _FakeMessage(text="!mu rmchan Nowhere")
asyncio.run(cmd_mu(bot, msg))
assert "not found" in bot._replies[0].lower()
def test_missing_args(self):
bot = _FakeBot()
msg = _FakeMessage(text="!mu rmchan")
asyncio.run(cmd_mu(bot, msg))
assert "Usage" in bot._replies[0]
# -- TestRename --------------------------------------------------------------
class TestRename:
def test_rename_channel(self):
afk = _make_channel("AFK", channel_id=5)
bot = _FakeBot(channels={5: afk})
msg = _FakeMessage(text="!mu rename AFK Chill")
asyncio.run(cmd_mu(bot, msg))
afk.rename_channel.assert_called_once_with("Chill")
assert "Renamed" in bot._replies[0]
def test_channel_not_found(self):
bot = _FakeBot(channels={})
msg = _FakeMessage(text="!mu rename Nowhere New")
asyncio.run(cmd_mu(bot, msg))
assert "not found" in bot._replies[0].lower()
def test_missing_args(self):
bot = _FakeBot()
msg = _FakeMessage(text="!mu rename AFK")
asyncio.run(cmd_mu(bot, msg))
assert "Usage" in bot._replies[0]
# -- TestDesc ----------------------------------------------------------------
class TestDesc:
def test_set_description(self):
afk = _make_channel("AFK", channel_id=5)
bot = _FakeBot(channels={5: afk})
msg = _FakeMessage(text="!mu desc AFK Away from keyboard")
asyncio.run(cmd_mu(bot, msg))
afk.set_channel_description.assert_called_once_with("Away from keyboard")
assert "description" in bot._replies[0].lower()
def test_channel_not_found(self):
bot = _FakeBot(channels={})
msg = _FakeMessage(text="!mu desc Nowhere some text")
asyncio.run(cmd_mu(bot, msg))
assert "not found" in bot._replies[0].lower()
def test_missing_args(self):
bot = _FakeBot()
msg = _FakeMessage(text="!mu desc AFK")
asyncio.run(cmd_mu(bot, msg))
assert "Usage" in bot._replies[0]

View File

@@ -956,7 +956,7 @@ class TestDuckCommand:
msg = _Msg(text="!duck")
asyncio.run(_mod.cmd_duck(bot, msg))
assert any("Duck:" in r for r in bot.replied)
assert any("floor=1%" in r for r in bot.replied)
assert any("floor=2%" in r for r in bot.replied)
assert any("restore=30s" in r for r in bot.replied)
def test_toggle_on(self):
@@ -2362,3 +2362,245 @@ class TestKeptRepair:
stored = json.loads(raw)
assert stored["filename"] == "song.webm"
assert (music_dir / "song.webm").is_file()
# ---------------------------------------------------------------------------
# TestAutoplayDiscovery
# ---------------------------------------------------------------------------
class TestAutoplayDiscovery:
"""Tests for the discovery integration in _play_loop autoplay."""
def test_config_defaults(self):
"""Default discover/discover_ratio values are set."""
bot = _FakeBot()
ps = _mod._ps(bot)
assert ps["discover"] is True
assert ps["discover_ratio"] == 3
def test_config_from_toml(self):
"""Config values are read from bot config."""
bot = _FakeBot()
bot.config = {"music": {"discover": False, "discover_ratio": 5}}
# Reset pstate so _ps re-reads config
bot._pstate.clear()
ps = _mod._ps(bot)
assert ps["discover"] is False
assert ps["discover_ratio"] == 5
def test_discovery_triggers_on_ratio(self, tmp_path):
"""Discovery is attempted when autoplay_count is a multiple of ratio."""
bot = _FakeBot()
ps = _mod._ps(bot)
ps["autoplay"] = True
ps["discover"] = True
ps["discover_ratio"] = 1 # trigger every pick
ps["autoplay_cooldown"] = 0
ps["duck_silence"] = 0
# Seed history so discovery has something to reference
ps["history"] = [
_mod._Track(url="x", title="Tool - Lateralus", requester="a"),
]
# Set up kept tracks for fallback pool
music_dir = tmp_path / "music"
music_dir.mkdir()
(music_dir / "a.opus").write_bytes(b"audio")
bot.state.set("music", "keep:1", json.dumps({
"url": "https://example.com/a", "title": "Kept Track",
"filename": "a.opus", "id": 1,
}))
discover_called = []
async def fake_discover(b, title):
discover_called.append(title)
return ("Deftones", "Change")
lastfm_mod = MagicMock()
lastfm_mod.discover_similar = fake_discover
bot.registry._modules = {"lastfm": lastfm_mod}
resolved = [("https://youtube.com/watch?v=x", "Deftones - Change")]
async def _run():
with patch.object(_mod, "_MUSIC_DIR", music_dir), \
patch.object(_mod, "_resolve_tracks", return_value=resolved), \
patch.object(_mod, "_download_track", return_value=None):
task = asyncio.create_task(
_mod._play_loop(bot, seek=0.0, fade_in=False),
)
# Let it pick a track, then cancel
await asyncio.sleep(0.5)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_run())
assert len(discover_called) >= 1
assert discover_called[0] == "Tool - Lateralus"
def test_discovery_disabled(self, tmp_path):
"""Discovery is skipped when discover=False."""
bot = _FakeBot()
ps = _mod._ps(bot)
ps["autoplay"] = True
ps["discover"] = False
ps["discover_ratio"] = 1
ps["autoplay_cooldown"] = 0
ps["duck_silence"] = 0
ps["history"] = [
_mod._Track(url="x", title="Tool - Lateralus", requester="a"),
]
music_dir = tmp_path / "music"
music_dir.mkdir()
(music_dir / "a.opus").write_bytes(b"audio")
bot.state.set("music", "keep:1", json.dumps({
"url": "https://example.com/a", "title": "Kept Track",
"filename": "a.opus", "id": 1,
}))
discover_called = []
async def fake_discover(b, title):
discover_called.append(title)
return ("X", "Y")
lastfm_mod = MagicMock()
lastfm_mod.discover_similar = fake_discover
bot.registry._modules = {"lastfm": lastfm_mod}
async def _run():
with patch.object(_mod, "_MUSIC_DIR", music_dir), \
patch.object(_mod, "_download_track", return_value=None):
task = asyncio.create_task(
_mod._play_loop(bot, seek=0.0, fade_in=False),
)
await asyncio.sleep(0.5)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_run())
assert discover_called == []
def test_discovery_dedup(self):
"""Same discovered track is not resolved twice (dedup by seen set)."""
# Unit-test the dedup logic directly: simulate the set-based
# deduplication that _play_loop uses with _discover_seen.
_discover_seen: set[str] = set()
def _would_resolve(artist: str, title: str) -> bool:
key = f"{artist.lower()}:{title.lower()}"
if key in _discover_seen:
return False
_discover_seen.add(key)
return True
assert _would_resolve("Deftones", "Change") is True
assert _would_resolve("Deftones", "Change") is False
assert _would_resolve("deftones", "change") is False
assert _would_resolve("Tool", "Sober") is True
def test_discovery_fallback_to_kept(self, tmp_path):
"""Falls back to kept deck when discovery returns None."""
bot = _FakeBot()
ps = _mod._ps(bot)
ps["autoplay"] = True
ps["discover"] = True
ps["discover_ratio"] = 1
ps["autoplay_cooldown"] = 0
ps["duck_silence"] = 0
ps["history"] = [
_mod._Track(url="x", title="Tool - Lateralus", requester="a"),
]
music_dir = tmp_path / "music"
music_dir.mkdir()
(music_dir / "a.opus").write_bytes(b"audio")
bot.state.set("music", "keep:1", json.dumps({
"url": "https://example.com/a", "title": "Kept Track",
"filename": "a.opus", "id": 1,
}))
async def fake_discover(b, title):
return None
lastfm_mod = MagicMock()
lastfm_mod.discover_similar = fake_discover
bot.registry._modules = {"lastfm": lastfm_mod}
queued_titles = []
async def _run():
with patch.object(_mod, "_MUSIC_DIR", music_dir), \
patch.object(_mod, "_download_track", return_value=None):
task = asyncio.create_task(
_mod._play_loop(bot, seek=0.0, fade_in=False),
)
await asyncio.sleep(0.5)
# Check what was queued -- should be kept track, not discovered
if ps.get("current"):
queued_titles.append(ps["current"].title)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_run())
# The kept track should have been used as fallback
if queued_titles:
assert queued_titles[0] == "Kept Track"
def test_no_history_skips_discovery(self, tmp_path):
"""Discovery is skipped when history is empty."""
bot = _FakeBot()
ps = _mod._ps(bot)
ps["autoplay"] = True
ps["discover"] = True
ps["discover_ratio"] = 1
ps["autoplay_cooldown"] = 0
ps["duck_silence"] = 0
ps["history"] = []
music_dir = tmp_path / "music"
music_dir.mkdir()
(music_dir / "a.opus").write_bytes(b"audio")
bot.state.set("music", "keep:1", json.dumps({
"url": "https://example.com/a", "title": "Kept Track",
"filename": "a.opus", "id": 1,
}))
discover_called = []
async def fake_discover(b, title):
discover_called.append(title)
return ("X", "Y")
lastfm_mod = MagicMock()
lastfm_mod.discover_similar = fake_discover
bot.registry._modules = {"lastfm": lastfm_mod}
async def _run():
with patch.object(_mod, "_MUSIC_DIR", music_dir), \
patch.object(_mod, "_download_track", return_value=None):
task = asyncio.create_task(
_mod._play_loop(bot, seek=0.0, fade_in=False),
)
await asyncio.sleep(0.5)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_run())
assert discover_called == []

310
tests/test_musicbrainz.py Normal file
View File

@@ -0,0 +1,310 @@
"""Tests for the MusicBrainz API helper module."""
import importlib.util
import json
import sys
import time
from io import BytesIO
from unittest.mock import MagicMock, patch
# -- Load module directly ----------------------------------------------------
_spec = importlib.util.spec_from_file_location(
"_musicbrainz", "plugins/_musicbrainz.py",
)
_mod = importlib.util.module_from_spec(_spec)
sys.modules["_musicbrainz"] = _mod
_spec.loader.exec_module(_mod)
# -- Helpers -----------------------------------------------------------------
def _make_resp(data: dict) -> MagicMock:
"""Create a fake HTTP response with JSON body."""
resp = MagicMock()
resp.read.return_value = json.dumps(data).encode()
return resp
# ---------------------------------------------------------------------------
# TestMbRequest
# ---------------------------------------------------------------------------
class TestMbRequest:
def setup_method(self):
_mod._last_request = 0.0
def test_returns_parsed_json(self):
resp = _make_resp({"status": "ok"})
with patch("derp.http.urlopen", return_value=resp):
result = _mod._mb_request("artist", {"query": "Tool"})
assert result == {"status": "ok"}
def test_rate_delay_enforced(self):
"""Second call within rate interval triggers sleep."""
_mod._last_request = time.monotonic()
resp = _make_resp({})
slept = []
with patch("derp.http.urlopen", return_value=resp), \
patch.object(_mod.time, "sleep", side_effect=slept.append), \
patch.object(_mod.time, "monotonic", return_value=_mod._last_request + 0.2):
_mod._mb_request("artist", {"query": "X"})
assert len(slept) == 1
assert slept[0] > 0
def test_no_delay_when_interval_elapsed(self):
"""No sleep when enough time has passed since last request."""
_mod._last_request = time.monotonic() - 5.0
resp = _make_resp({})
with patch("derp.http.urlopen", return_value=resp), \
patch.object(_mod.time, "sleep") as mock_sleep:
_mod._mb_request("artist", {"query": "X"})
mock_sleep.assert_not_called()
def test_returns_empty_on_error(self):
with patch("derp.http.urlopen", side_effect=ConnectionError("fail")):
result = _mod._mb_request("artist", {"query": "X"})
assert result == {}
def test_updates_last_request_on_success(self):
_mod._last_request = 0.0
resp = _make_resp({})
with patch("derp.http.urlopen", return_value=resp):
_mod._mb_request("test")
assert _mod._last_request > 0
def test_updates_last_request_on_error(self):
_mod._last_request = 0.0
with patch("derp.http.urlopen", side_effect=Exception("boom")):
_mod._mb_request("test")
assert _mod._last_request > 0
def test_none_params(self):
"""Handles None params without error."""
resp = _make_resp({"ok": True})
with patch("derp.http.urlopen", return_value=resp):
result = _mod._mb_request("test", None)
assert result == {"ok": True}
# ---------------------------------------------------------------------------
# TestMbSearchArtist
# ---------------------------------------------------------------------------
class TestMbSearchArtist:
def setup_method(self):
_mod._last_request = 0.0
def test_returns_mbid(self):
data = {"artists": [{"id": "abc-123", "name": "Tool", "score": 100}]}
with patch.object(_mod, "_mb_request", return_value=data):
result = _mod.mb_search_artist("Tool")
assert result == "abc-123"
def test_returns_none_no_results(self):
with patch.object(_mod, "_mb_request", return_value={"artists": []}):
assert _mod.mb_search_artist("Unknown") is None
def test_returns_none_on_empty_response(self):
with patch.object(_mod, "_mb_request", return_value={}):
assert _mod.mb_search_artist("X") is None
def test_returns_none_low_score(self):
"""Rejects matches with score below 50."""
data = {"artists": [{"id": "low", "name": "Mismatch", "score": 30}]}
with patch.object(_mod, "_mb_request", return_value=data):
assert _mod.mb_search_artist("Tool") is None
def test_returns_none_on_error(self):
with patch.object(_mod, "_mb_request", return_value={}):
assert _mod.mb_search_artist("Error") is None
def test_accepts_high_score(self):
data = {"artists": [{"id": "abc", "name": "Tool", "score": 85}]}
with patch.object(_mod, "_mb_request", return_value=data):
assert _mod.mb_search_artist("Tool") == "abc"
# ---------------------------------------------------------------------------
# TestMbArtistTags
# ---------------------------------------------------------------------------
class TestMbArtistTags:
def setup_method(self):
_mod._last_request = 0.0
def test_returns_sorted_top_5(self):
data = {"tags": [
{"name": "rock", "count": 50},
{"name": "metal", "count": 100},
{"name": "prog", "count": 80},
{"name": "alternative", "count": 60},
{"name": "hard rock", "count": 40},
{"name": "grunge", "count": 30},
]}
with patch.object(_mod, "_mb_request", return_value=data):
result = _mod.mb_artist_tags("mbid-123")
assert len(result) == 5
assert result[0] == "metal"
assert result[1] == "prog"
assert result[2] == "alternative"
assert result[3] == "rock"
assert result[4] == "hard rock"
def test_empty_tags(self):
with patch.object(_mod, "_mb_request", return_value={"tags": []}):
assert _mod.mb_artist_tags("mbid") == []
def test_no_tags_key(self):
with patch.object(_mod, "_mb_request", return_value={}):
assert _mod.mb_artist_tags("mbid") == []
def test_skips_nameless_tags(self):
data = {"tags": [
{"name": "rock", "count": 50},
{"count": 100}, # no name
{"name": "", "count": 80}, # empty name
]}
with patch.object(_mod, "_mb_request", return_value=data):
result = _mod.mb_artist_tags("mbid")
assert result == ["rock"]
def test_fewer_than_5_tags(self):
data = {"tags": [{"name": "jazz", "count": 10}]}
with patch.object(_mod, "_mb_request", return_value=data):
result = _mod.mb_artist_tags("mbid")
assert result == ["jazz"]
# ---------------------------------------------------------------------------
# TestMbFindSimilarRecordings
# ---------------------------------------------------------------------------
class TestMbFindSimilarRecordings:
def setup_method(self):
_mod._last_request = 0.0
def test_returns_dicts(self):
data = {"recordings": [
{
"title": "Song A",
"artist-credit": [{"name": "Other Artist"}],
},
{
"title": "Song B",
"artist-credit": [{"name": "Another Band"}],
},
]}
with patch.object(_mod, "_mb_request", return_value=data):
result = _mod.mb_find_similar_recordings(
"Tool", ["rock", "metal"],
)
assert len(result) == 2
assert result[0] == {"artist": "Other Artist", "title": "Song A"}
assert result[1] == {"artist": "Another Band", "title": "Song B"}
def test_excludes_original_artist(self):
data = {"recordings": [
{
"title": "Own Song",
"artist-credit": [{"name": "Tool"}],
},
{
"title": "Other Song",
"artist-credit": [{"name": "Deftones"}],
},
]}
with patch.object(_mod, "_mb_request", return_value=data):
result = _mod.mb_find_similar_recordings(
"Tool", ["rock"],
)
assert len(result) == 1
assert result[0]["artist"] == "Deftones"
def test_excludes_original_artist_case_insensitive(self):
data = {"recordings": [
{
"title": "Song",
"artist-credit": [{"name": "TOOL"}],
},
]}
with patch.object(_mod, "_mb_request", return_value=data):
result = _mod.mb_find_similar_recordings(
"Tool", ["rock"],
)
assert result == []
def test_deduplicates(self):
data = {"recordings": [
{
"title": "Song A",
"artist-credit": [{"name": "Band X"}],
},
{
"title": "Song A",
"artist-credit": [{"name": "Band X"}],
},
]}
with patch.object(_mod, "_mb_request", return_value=data):
result = _mod.mb_find_similar_recordings(
"Other", ["rock"],
)
assert len(result) == 1
def test_empty_tags(self):
result = _mod.mb_find_similar_recordings("Tool", [])
assert result == []
def test_no_recordings(self):
with patch.object(_mod, "_mb_request", return_value={"recordings": []}):
result = _mod.mb_find_similar_recordings(
"Tool", ["rock"],
)
assert result == []
def test_empty_response(self):
with patch.object(_mod, "_mb_request", return_value={}):
result = _mod.mb_find_similar_recordings(
"Tool", ["rock"],
)
assert result == []
def test_skips_missing_title(self):
data = {"recordings": [
{
"title": "",
"artist-credit": [{"name": "Band"}],
},
]}
with patch.object(_mod, "_mb_request", return_value=data):
result = _mod.mb_find_similar_recordings(
"Other", ["rock"],
)
assert result == []
def test_skips_missing_artist_credit(self):
data = {"recordings": [
{"title": "Song", "artist-credit": []},
{"title": "Song2"},
]}
with patch.object(_mod, "_mb_request", return_value=data):
result = _mod.mb_find_similar_recordings(
"Other", ["rock"],
)
assert result == []
def test_uses_top_two_tags(self):
"""Query should use at most 2 tags."""
with patch.object(_mod, "_mb_request", return_value={}) as mock_req:
_mod.mb_find_similar_recordings(
"Tool", ["rock", "metal", "prog"],
)
call_args = mock_req.call_args
query = call_args[1]["query"] if "query" in (call_args[1] or {}) else call_args[0][1].get("query", "")
# Verify the query contains both tag references
assert "rock" in query or "metal" in query