feat: fade-out on skip/stop/prev, song metadata on keep
Some checks failed
Some checks failed
- Add fade_step parameter to stream_audio for fast volume ramps - _fade_and_cancel helper: smooth ~0.8s fade before track switch - !skip, !stop, !seek now fade out instead of cutting instantly - !prev command: go back to previous track (10-track history stack) - !keep fetches title/artist/duration via yt-dlp, stores in bot.state - !kept displays metadata (title, artist, duration, file size) - !kept clear also removes stored metadata - 29 new tests for fade, prev, history, keep metadata Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -549,14 +549,19 @@ HTML stripped on receive, escaped on send. IRC-only commands are no-ops.
|
||||
!play <url|playlist> # Play audio (YouTube, SoundCloud, etc.)
|
||||
!play <playlist-url> # Playlist tracks expanded into queue
|
||||
!play classical music # YouTube search, random pick from top 10
|
||||
!stop # Stop playback, clear queue
|
||||
!skip # Skip current track
|
||||
!stop # Stop playback, clear queue (fades out)
|
||||
!skip # Skip current track (fades out)
|
||||
!prev # Go back to previous track (fades out)
|
||||
!seek 1:30 # Seek to position (also +30, -30)
|
||||
!resume # Resume last stopped/skipped track
|
||||
!queue # Show queue
|
||||
!queue <url> # Add to queue (alias for !play)
|
||||
!np # Now playing
|
||||
!volume # Show current volume
|
||||
!volume 75 # Set volume (0-100, default 50)
|
||||
!keep # Keep current file + save metadata
|
||||
!kept # List kept files with metadata
|
||||
!kept clear # Delete all kept files + metadata
|
||||
!duck # Show ducking status
|
||||
!duck on # Enable voice ducking
|
||||
!duck off # Disable voice ducking
|
||||
@@ -567,7 +572,9 @@ HTML stripped on receive, escaped on send. IRC-only commands are no-ops.
|
||||
|
||||
Requires: `yt-dlp`, `ffmpeg`, `libopus` on the host.
|
||||
Max 50 tracks in queue. Playlists auto-expand; excess truncated at limit.
|
||||
Volume ramps smoothly over ~1s (no abrupt jumps mid-playback).
|
||||
Skip/stop/prev/seek fade out smoothly (~0.8s); volume ramps over ~1s.
|
||||
`!prev` pops from a 10-track history stack (populated on skip/finish).
|
||||
`!keep` fetches title/artist/duration via yt-dlp and stores in `bot.state`.
|
||||
`!resume` restores position across restarts (persisted via `bot.state`).
|
||||
Auto-resumes on reconnect if channel is silent (waits up to 60s for silence).
|
||||
Mumble-only: `!play` replies with error on other adapters, others silently no-op.
|
||||
|
||||
@@ -1618,16 +1618,17 @@ and voice transmission.
|
||||
```
|
||||
!play <url|playlist> Play audio or add to queue (playlists expanded)
|
||||
!play <query> Search YouTube, play a random result
|
||||
!stop Stop playback, clear queue
|
||||
!skip Skip current track
|
||||
!stop Stop playback, clear queue (fade-out)
|
||||
!skip Skip current track (fade-out)
|
||||
!prev Go back to the previous track (fade-out)
|
||||
!seek <offset> Seek to position (1:30, 90, +30, -30)
|
||||
!resume Resume last stopped/skipped track from saved position
|
||||
!queue Show queue
|
||||
!queue <url> Add to queue (alias for !play)
|
||||
!np Now playing
|
||||
!volume [0-100] Get/set volume (persisted across restarts)
|
||||
!keep Keep current track's audio file after playback
|
||||
!kept [clear] List kept files or clear all
|
||||
!keep Keep current track's audio file (with metadata)
|
||||
!kept [clear] List kept files with metadata, or clear all
|
||||
!testtone Play 3-second 440Hz test tone
|
||||
```
|
||||
|
||||
@@ -1636,7 +1637,8 @@ and voice transmission.
|
||||
and one is picked randomly
|
||||
- Playlists are expanded into individual tracks; excess tracks are
|
||||
truncated at the queue limit
|
||||
- Volume changes ramp smoothly over ~1s (no abrupt jumps)
|
||||
- `!skip`, `!stop`, `!prev`, and `!seek` fade out smoothly (~0.8s) before
|
||||
switching tracks; volume changes ramp smoothly over ~1s (no abrupt jumps)
|
||||
- Default volume: 50%; persisted via `bot.state` across restarts
|
||||
- Titles resolved via `yt-dlp --flat-playlist` before playback
|
||||
- Audio is downloaded before playback (`data/music/`); files are deleted
|
||||
@@ -1648,6 +1650,8 @@ and voice transmission.
|
||||
other music commands silently no-op
|
||||
- Playback runs as an asyncio background task; the bot remains responsive
|
||||
to text commands during streaming
|
||||
- `!prev` returns to the last-played track; up to 10 tracks are kept in a
|
||||
per-session history stack (populated on skip and natural track completion)
|
||||
- `!resume` continues from where playback was interrupted (`!stop`/`!skip`);
|
||||
position is persisted via `bot.state` and survives bot restarts
|
||||
|
||||
@@ -1742,9 +1746,11 @@ file (natural dedup).
|
||||
|
||||
- If download fails, playback falls back to streaming (`yt-dlp | ffmpeg`)
|
||||
- After a track finishes, the local file is automatically deleted
|
||||
- Use `!keep` during playback to preserve the file
|
||||
- Use `!kept` to list preserved files and their sizes
|
||||
- Use `!kept clear` to delete all preserved files
|
||||
- Use `!keep` during playback to preserve the file; metadata (title, artist,
|
||||
duration) is fetched via yt-dlp and stored in `bot.state`
|
||||
- Use `!kept` to list preserved files with metadata (title, artist, duration,
|
||||
file size)
|
||||
- Use `!kept clear` to delete all preserved files and their metadata
|
||||
- On cancel/error, files are not deleted (needed for `!resume`)
|
||||
|
||||
### Extra Mumble Bots
|
||||
|
||||
179
plugins/music.py
179
plugins/music.py
@@ -48,6 +48,9 @@ def _ps(bot):
|
||||
"duck_restore": cfg.get("duck_restore", 30),
|
||||
"duck_vol": None,
|
||||
"duck_task": None,
|
||||
"fade_vol": None,
|
||||
"fade_step": None,
|
||||
"history": [],
|
||||
"_watcher_task": None,
|
||||
})
|
||||
|
||||
@@ -184,6 +187,28 @@ def _resolve_tracks(url: str, max_tracks: int = _MAX_QUEUE) -> list[tuple[str, s
|
||||
_MUSIC_DIR = Path("data/music")
|
||||
|
||||
|
||||
def _fetch_metadata(url: str) -> dict:
|
||||
"""Fetch track metadata via yt-dlp. Blocking -- run in executor."""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["yt-dlp", "--print", "title", "--print", "artist",
|
||||
"--print", "duration", "--no-warnings", "--no-download", url],
|
||||
capture_output=True, text=True, timeout=15,
|
||||
)
|
||||
lines = result.stdout.strip().splitlines()
|
||||
return {
|
||||
"title": lines[0] if len(lines) > 0 else "",
|
||||
"artist": lines[1] if len(lines) > 1 else "",
|
||||
"duration": (float(lines[2])
|
||||
if len(lines) > 2
|
||||
and lines[2].replace(".", "", 1).isdigit()
|
||||
else 0),
|
||||
}
|
||||
except Exception:
|
||||
log.warning("music: metadata fetch failed for %s", url)
|
||||
return {"title": "", "artist": "", "duration": 0}
|
||||
|
||||
|
||||
def _download_track(url: str, track_id: str) -> Path | None:
|
||||
"""Download audio to data/music/. Blocking -- run in executor."""
|
||||
_MUSIC_DIR.mkdir(parents=True, exist_ok=True)
|
||||
@@ -372,6 +397,8 @@ async def _play_loop(bot, *, seek: float = 0.0) -> None:
|
||||
while ps["queue"]:
|
||||
track = ps["queue"].pop(0)
|
||||
ps["current"] = track
|
||||
ps["fade_vol"] = None
|
||||
ps["fade_step"] = None
|
||||
|
||||
done = asyncio.Event()
|
||||
ps["done_event"] = done
|
||||
@@ -403,13 +430,16 @@ async def _play_loop(bot, *, seek: float = 0.0) -> None:
|
||||
await bot.stream_audio(
|
||||
source,
|
||||
volume=lambda: (
|
||||
ps["duck_vol"]
|
||||
ps["fade_vol"]
|
||||
if ps["fade_vol"] is not None
|
||||
else ps["duck_vol"]
|
||||
if ps["duck_vol"] is not None
|
||||
else ps["volume"]
|
||||
) / 100.0,
|
||||
on_done=done,
|
||||
seek=cur_seek,
|
||||
progress=progress,
|
||||
fade_step=lambda: ps.get("fade_step"),
|
||||
)
|
||||
except asyncio.CancelledError:
|
||||
elapsed = cur_seek + progress[0] * 0.02
|
||||
@@ -426,6 +456,12 @@ async def _play_loop(bot, *, seek: float = 0.0) -> None:
|
||||
await done.wait()
|
||||
if progress[0] > 0:
|
||||
_clear_resume(bot)
|
||||
# Push finished track to history
|
||||
ps["history"].append(_Track(url=track.url, title=track.title,
|
||||
requester=track.requester,
|
||||
origin=track.origin))
|
||||
if len(ps["history"]) > _MAX_HISTORY:
|
||||
ps["history"].pop(0)
|
||||
_cleanup_track(track)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
@@ -437,6 +473,8 @@ async def _play_loop(bot, *, seek: float = 0.0) -> None:
|
||||
ps["task"] = None
|
||||
ps["duck_vol"] = None
|
||||
ps["duck_task"] = None
|
||||
ps["fade_vol"] = None
|
||||
ps["fade_step"] = None
|
||||
ps["progress"] = None
|
||||
ps["cur_seek"] = 0.0
|
||||
|
||||
@@ -452,6 +490,29 @@ def _ensure_loop(bot, *, seek: float = 0.0) -> None:
|
||||
)
|
||||
|
||||
|
||||
_MAX_HISTORY = 10
|
||||
|
||||
|
||||
async def _fade_and_cancel(bot, duration: float = 0.8) -> None:
|
||||
"""Fade audio to zero over ``duration`` seconds, then cancel the task."""
|
||||
ps = _ps(bot)
|
||||
task = ps.get("task")
|
||||
if not task or task.done():
|
||||
return
|
||||
# Fast ramp: reach 0 from any volume in ~duration
|
||||
# e.g. 0.8s = 40 frames at 20ms, step = 1.0/40 = 0.025
|
||||
ps["fade_step"] = 1.0 / (duration / 0.02)
|
||||
ps["fade_vol"] = 0
|
||||
await asyncio.sleep(duration)
|
||||
ps["fade_step"] = None
|
||||
if not task.done():
|
||||
task.cancel()
|
||||
try:
|
||||
await task
|
||||
except (asyncio.CancelledError, Exception):
|
||||
pass
|
||||
|
||||
|
||||
# -- Commands ----------------------------------------------------------------
|
||||
|
||||
|
||||
@@ -536,17 +597,15 @@ async def cmd_stop(bot, message):
|
||||
|
||||
task = ps.get("task")
|
||||
if task and not task.done():
|
||||
task.cancel()
|
||||
try:
|
||||
await task
|
||||
except (asyncio.CancelledError, Exception):
|
||||
pass
|
||||
await _fade_and_cancel(bot)
|
||||
else:
|
||||
ps["current"] = None
|
||||
ps["task"] = None
|
||||
ps["done_event"] = None
|
||||
ps["duck_vol"] = None
|
||||
ps["duck_task"] = None
|
||||
ps["fade_vol"] = None
|
||||
ps["fade_step"] = None
|
||||
ps["progress"] = None
|
||||
ps["cur_seek"] = 0.0
|
||||
|
||||
@@ -602,14 +661,14 @@ async def cmd_skip(bot, message):
|
||||
return
|
||||
|
||||
skipped = ps["current"]
|
||||
# Push skipped track to history
|
||||
ps["history"].append(_Track(url=skipped.url, title=skipped.title,
|
||||
requester=skipped.requester,
|
||||
origin=skipped.origin))
|
||||
if len(ps["history"]) > _MAX_HISTORY:
|
||||
ps["history"].pop(0)
|
||||
|
||||
task = ps.get("task")
|
||||
if task and not task.done():
|
||||
task.cancel()
|
||||
try:
|
||||
await task
|
||||
except (asyncio.CancelledError, Exception):
|
||||
pass
|
||||
await _fade_and_cancel(bot)
|
||||
|
||||
if ps["queue"]:
|
||||
_ensure_loop(bot)
|
||||
@@ -621,6 +680,33 @@ async def cmd_skip(bot, message):
|
||||
await bot.reply(message, "Skipped, queue empty")
|
||||
|
||||
|
||||
@command("prev", help="Music: !prev -- play previous track")
|
||||
async def cmd_prev(bot, message):
|
||||
"""Go back to the previous track."""
|
||||
if not _is_mumble(bot):
|
||||
return
|
||||
|
||||
ps = _ps(bot)
|
||||
if not ps["history"]:
|
||||
await bot.reply(message, "No previous track")
|
||||
return
|
||||
|
||||
prev = ps["history"].pop()
|
||||
|
||||
# Re-queue current track so it plays next after prev
|
||||
if ps["current"] is not None:
|
||||
ps["queue"].insert(0, _Track(url=ps["current"].url,
|
||||
title=ps["current"].title,
|
||||
requester=ps["current"].requester,
|
||||
origin=ps["current"].origin))
|
||||
|
||||
ps["queue"].insert(0, prev)
|
||||
|
||||
await _fade_and_cancel(bot)
|
||||
_ensure_loop(bot)
|
||||
await bot.reply(message, f"Previous: {_truncate(prev.title)}")
|
||||
|
||||
|
||||
@command("seek", help="Music: !seek <offset>")
|
||||
async def cmd_seek(bot, message):
|
||||
"""Seek to position in current track.
|
||||
@@ -666,14 +752,7 @@ async def cmd_seek(bot, message):
|
||||
# Re-insert current track at front of queue (local_path intact)
|
||||
ps["queue"].insert(0, track)
|
||||
|
||||
# Cancel the play loop and wait for cleanup
|
||||
task = ps.get("task")
|
||||
if task and not task.done():
|
||||
task.cancel()
|
||||
try:
|
||||
await task
|
||||
except (asyncio.CancelledError, Exception):
|
||||
pass
|
||||
await _fade_and_cancel(bot)
|
||||
|
||||
_ensure_loop(bot, seek=target)
|
||||
await bot.reply(message, f"Seeking to {_fmt_time(target)}")
|
||||
@@ -875,7 +954,11 @@ async def cmd_duck(bot, message):
|
||||
|
||||
@command("keep", help="Music: !keep -- keep current track's audio file")
|
||||
async def cmd_keep(bot, message):
|
||||
"""Mark the current track's local file to keep after playback."""
|
||||
"""Mark the current track's local file to keep after playback.
|
||||
|
||||
Fetches metadata (title, artist, duration) and persists it alongside
|
||||
the filename so ``!kept`` can display useful information.
|
||||
"""
|
||||
if not _is_mumble(bot):
|
||||
await bot.reply(message, "Mumble-only feature")
|
||||
return
|
||||
@@ -889,12 +972,35 @@ async def cmd_keep(bot, message):
|
||||
await bot.reply(message, "No local file for current track")
|
||||
return
|
||||
track.keep = True
|
||||
await bot.reply(message, f"Keeping: {track.local_path.name}")
|
||||
filename = track.local_path.name
|
||||
|
||||
# Fetch metadata in background
|
||||
loop = asyncio.get_running_loop()
|
||||
meta = await loop.run_in_executor(None, _fetch_metadata, track.url)
|
||||
meta["filename"] = filename
|
||||
meta["url"] = track.url
|
||||
bot.state.set("music", f"keep:{filename}", json.dumps(meta))
|
||||
|
||||
# Build display string
|
||||
parts = []
|
||||
title = meta.get("title") or track.title
|
||||
artist = meta.get("artist", "")
|
||||
dur = meta.get("duration", 0)
|
||||
parts.append(_truncate(title))
|
||||
if artist and artist.lower() not in ("na", "unknown", ""):
|
||||
parts[0] += f" -- {artist}"
|
||||
if dur > 0:
|
||||
parts[0] += f" ({_fmt_time(dur)})"
|
||||
await bot.reply(message, f"Keeping: {parts[0]}")
|
||||
|
||||
|
||||
@command("kept", help="Music: !kept [clear] -- list or clear kept files")
|
||||
async def cmd_kept(bot, message):
|
||||
"""List or clear kept audio files in data/music/."""
|
||||
"""List or clear kept audio files in data/music/.
|
||||
|
||||
When metadata is available (from ``!keep``), displays title, artist,
|
||||
duration, and file size. Falls back to filename + size otherwise.
|
||||
"""
|
||||
if not _is_mumble(bot):
|
||||
await bot.reply(message, "Mumble-only feature")
|
||||
return
|
||||
@@ -907,6 +1013,10 @@ async def cmd_kept(bot, message):
|
||||
if f.is_file():
|
||||
f.unlink()
|
||||
count += 1
|
||||
# Clear stored metadata
|
||||
for key in bot.state.keys("music"):
|
||||
if key.startswith("keep:"):
|
||||
bot.state.delete("music", key)
|
||||
await bot.reply(message, f"Deleted {count} file(s)")
|
||||
return
|
||||
|
||||
@@ -919,9 +1029,24 @@ async def cmd_kept(bot, message):
|
||||
lines = [f"Kept files ({len(files)}):"]
|
||||
for f in files:
|
||||
size_mb = f.stat().st_size / (1024 * 1024)
|
||||
lines.append(f" {f.name} ({size_mb:.1f}MB)")
|
||||
for line in lines:
|
||||
await bot.reply(message, line)
|
||||
raw = bot.state.get("music", f"keep:{f.name}")
|
||||
if raw:
|
||||
try:
|
||||
meta = json.loads(raw)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
meta = {}
|
||||
title = meta.get("title", "")
|
||||
artist = meta.get("artist", "")
|
||||
dur = meta.get("duration", 0)
|
||||
label = _truncate(title) if title else f.name
|
||||
if artist and artist.lower() not in ("na", "unknown", ""):
|
||||
label += f" -- {artist}"
|
||||
if dur > 0:
|
||||
label += f" ({_fmt_time(dur)})"
|
||||
lines.append(f" {label} [{size_mb:.1f}MB]")
|
||||
else:
|
||||
lines.append(f" {f.name} ({size_mb:.1f}MB)")
|
||||
await bot.long_reply(message, lines, label="kept files")
|
||||
|
||||
|
||||
# -- Plugin lifecycle --------------------------------------------------------
|
||||
|
||||
@@ -521,6 +521,7 @@ class MumbleBot:
|
||||
on_done=None,
|
||||
seek: float = 0.0,
|
||||
progress: list | None = None,
|
||||
fade_step=None,
|
||||
) -> None:
|
||||
"""Stream audio from URL through yt-dlp|ffmpeg to voice channel.
|
||||
|
||||
@@ -534,7 +535,9 @@ class MumbleBot:
|
||||
``volume`` may be a float (static) or a callable returning float
|
||||
(dynamic, re-read each frame). ``seek`` skips into the track
|
||||
(seconds). ``progress`` is a mutable ``[0]`` list updated to the
|
||||
current frame count each frame.
|
||||
current frame count each frame. ``fade_step`` is an optional
|
||||
callable returning a float or None; when non-None it overrides
|
||||
the default ramp step for fast fades (e.g. skip/stop).
|
||||
"""
|
||||
if self._mumble is None:
|
||||
return
|
||||
@@ -589,19 +592,24 @@ class MumbleBot:
|
||||
_was_feeding = True
|
||||
|
||||
target = _get_vol()
|
||||
step = _max_step
|
||||
if fade_step is not None:
|
||||
fs = fade_step()
|
||||
if fs:
|
||||
step = fs
|
||||
if _cur_vol == target:
|
||||
# Fast path: flat scaling
|
||||
if target != 1.0:
|
||||
pcm = _scale_pcm(pcm, target)
|
||||
else:
|
||||
# Ramp toward target, clamped to _max_step per frame
|
||||
# Ramp toward target, clamped to step per frame
|
||||
diff = target - _cur_vol
|
||||
if abs(diff) <= _max_step:
|
||||
if abs(diff) <= step:
|
||||
next_vol = target
|
||||
elif diff > 0:
|
||||
next_vol = _cur_vol + _max_step
|
||||
next_vol = _cur_vol + step
|
||||
else:
|
||||
next_vol = _cur_vol - _max_step
|
||||
next_vol = _cur_vol - step
|
||||
pcm = _scale_pcm_ramp(pcm, _cur_vol, next_vol)
|
||||
_cur_vol = next_vol
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
import asyncio
|
||||
import importlib.util
|
||||
import json
|
||||
import sys
|
||||
import time
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
@@ -53,6 +54,11 @@ class _FakeBot:
|
||||
async def reply(self, message, text: str) -> None:
|
||||
self.replied.append(text)
|
||||
|
||||
async def long_reply(self, message, lines: list[str], *,
|
||||
label: str = "") -> None:
|
||||
for line in lines:
|
||||
self.replied.append(line)
|
||||
|
||||
def _is_admin(self, message) -> bool:
|
||||
return False
|
||||
|
||||
@@ -1358,3 +1364,295 @@ class TestVolumePersistence:
|
||||
asyncio.run(_mod.on_connected(bot))
|
||||
ps = _mod._ps(bot)
|
||||
assert ps["volume"] == 50 # default unchanged
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestFadeAndCancel
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestFadeAndCancel:
|
||||
def test_sets_fade_state(self):
|
||||
"""_fade_and_cancel sets fade_vol=0 and a fast fade_step."""
|
||||
bot = _FakeBot()
|
||||
ps = _mod._ps(bot)
|
||||
# Create a fake task that stays "running"
|
||||
mock_task = MagicMock()
|
||||
mock_task.done.return_value = False
|
||||
ps["task"] = mock_task
|
||||
|
||||
async def _check():
|
||||
task = asyncio.create_task(_mod._fade_and_cancel(bot, duration=0.1))
|
||||
# Let the fade start
|
||||
await asyncio.sleep(0.02)
|
||||
assert ps["fade_vol"] == 0
|
||||
assert ps["fade_step"] is not None
|
||||
assert ps["fade_step"] > 0.01
|
||||
await task
|
||||
asyncio.run(_check())
|
||||
|
||||
def test_noop_when_no_task(self):
|
||||
"""_fade_and_cancel returns immediately if no task is running."""
|
||||
bot = _FakeBot()
|
||||
_mod._ps(bot)
|
||||
asyncio.run(_mod._fade_and_cancel(bot, duration=0.1))
|
||||
|
||||
def test_clears_fade_step_after_cancel(self):
|
||||
"""fade_step is reset to None after cancellation."""
|
||||
bot = _FakeBot()
|
||||
ps = _mod._ps(bot)
|
||||
mock_task = MagicMock()
|
||||
mock_task.done.return_value = False
|
||||
ps["task"] = mock_task
|
||||
asyncio.run(_mod._fade_and_cancel(bot, duration=0.1))
|
||||
assert ps["fade_step"] is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestPrevCommand
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestPrevCommand:
|
||||
def test_prev_no_history(self):
|
||||
bot = _FakeBot()
|
||||
msg = _Msg(text="!prev")
|
||||
asyncio.run(_mod.cmd_prev(bot, msg))
|
||||
assert any("No previous" in r for r in bot.replied)
|
||||
|
||||
def test_prev_pops_history(self):
|
||||
bot = _FakeBot()
|
||||
ps = _mod._ps(bot)
|
||||
ps["current"] = _mod._Track(url="a", title="Current", requester="x")
|
||||
ps["history"] = [
|
||||
_mod._Track(url="b", title="Previous", requester="y"),
|
||||
]
|
||||
mock_task = MagicMock()
|
||||
mock_task.done.return_value = False
|
||||
ps["task"] = mock_task
|
||||
msg = _Msg(text="!prev")
|
||||
with patch.object(_mod, "_fade_and_cancel", new_callable=AsyncMock):
|
||||
with patch.object(_mod, "_ensure_loop"):
|
||||
asyncio.run(_mod.cmd_prev(bot, msg))
|
||||
assert any("Previous" in r for r in bot.replied)
|
||||
# History should be empty (popped the only entry)
|
||||
assert len(ps["history"]) == 0
|
||||
# Queue should have: prev track, then current track
|
||||
assert len(ps["queue"]) == 2
|
||||
assert ps["queue"][0].title == "Previous"
|
||||
assert ps["queue"][1].title == "Current"
|
||||
|
||||
def test_prev_non_mumble(self):
|
||||
bot = _FakeBot(mumble=False)
|
||||
msg = _Msg(text="!prev")
|
||||
asyncio.run(_mod.cmd_prev(bot, msg))
|
||||
assert bot.replied == []
|
||||
|
||||
def test_prev_no_current_track(self):
|
||||
"""!prev with history but nothing currently playing."""
|
||||
bot = _FakeBot()
|
||||
ps = _mod._ps(bot)
|
||||
ps["history"] = [
|
||||
_mod._Track(url="b", title="Previous", requester="y"),
|
||||
]
|
||||
msg = _Msg(text="!prev")
|
||||
with patch.object(_mod, "_fade_and_cancel", new_callable=AsyncMock):
|
||||
with patch.object(_mod, "_ensure_loop"):
|
||||
asyncio.run(_mod.cmd_prev(bot, msg))
|
||||
# Only the prev track in queue (no current to re-queue)
|
||||
assert len(ps["queue"]) == 1
|
||||
assert ps["queue"][0].title == "Previous"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestHistoryTracking
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestHistoryTracking:
|
||||
def test_skip_pushes_to_history(self):
|
||||
"""Skipping a track adds it to history."""
|
||||
bot = _FakeBot()
|
||||
ps = _mod._ps(bot)
|
||||
ps["current"] = _mod._Track(url="a", title="First", requester="x")
|
||||
mock_task = MagicMock()
|
||||
mock_task.done.return_value = False
|
||||
ps["task"] = mock_task
|
||||
msg = _Msg(text="!skip")
|
||||
with patch.object(_mod, "_fade_and_cancel", new_callable=AsyncMock):
|
||||
asyncio.run(_mod.cmd_skip(bot, msg))
|
||||
assert len(ps["history"]) == 1
|
||||
assert ps["history"][0].title == "First"
|
||||
|
||||
def test_history_capped(self):
|
||||
"""History does not exceed _MAX_HISTORY entries."""
|
||||
bot = _FakeBot()
|
||||
ps = _mod._ps(bot)
|
||||
# Fill history to max
|
||||
for i in range(_mod._MAX_HISTORY):
|
||||
ps["history"].append(
|
||||
_mod._Track(url=f"u{i}", title=f"T{i}", requester="a"),
|
||||
)
|
||||
assert len(ps["history"]) == _mod._MAX_HISTORY
|
||||
# Skip another track, pushing to history
|
||||
ps["current"] = _mod._Track(url="new", title="New", requester="x")
|
||||
mock_task = MagicMock()
|
||||
mock_task.done.return_value = False
|
||||
ps["task"] = mock_task
|
||||
msg = _Msg(text="!skip")
|
||||
with patch.object(_mod, "_fade_and_cancel", new_callable=AsyncMock):
|
||||
asyncio.run(_mod.cmd_skip(bot, msg))
|
||||
assert len(ps["history"]) == _mod._MAX_HISTORY
|
||||
assert ps["history"][-1].title == "New"
|
||||
|
||||
def test_ps_has_history(self):
|
||||
"""_ps initializes with empty history list."""
|
||||
bot = _FakeBot()
|
||||
ps = _mod._ps(bot)
|
||||
assert ps["history"] == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestFadeState
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestFadeState:
|
||||
def test_ps_fade_fields_initialized(self):
|
||||
"""_ps initializes fade_vol and fade_step to None."""
|
||||
bot = _FakeBot()
|
||||
ps = _mod._ps(bot)
|
||||
assert ps["fade_vol"] is None
|
||||
assert ps["fade_step"] is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestKeepMetadata
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestKeepMetadata:
|
||||
def test_keep_stores_metadata(self, tmp_path):
|
||||
"""!keep stores metadata JSON in bot.state."""
|
||||
bot = _FakeBot()
|
||||
ps = _mod._ps(bot)
|
||||
f = tmp_path / "abc123.opus"
|
||||
f.write_bytes(b"audio")
|
||||
track = _mod._Track(
|
||||
url="https://example.com/v", title="Test Song",
|
||||
requester="a", local_path=f,
|
||||
)
|
||||
ps["current"] = track
|
||||
msg = _Msg(text="!keep")
|
||||
meta = {"title": "My Song", "artist": "Artist", "duration": 195.0}
|
||||
with patch.object(_mod, "_fetch_metadata", return_value=meta):
|
||||
asyncio.run(_mod.cmd_keep(bot, msg))
|
||||
assert track.keep is True
|
||||
raw = bot.state.get("music", "keep:abc123.opus")
|
||||
assert raw is not None
|
||||
stored = json.loads(raw)
|
||||
assert stored["title"] == "My Song"
|
||||
assert stored["artist"] == "Artist"
|
||||
assert stored["duration"] == 195.0
|
||||
assert any("My Song" in r for r in bot.replied)
|
||||
assert any("Artist" in r for r in bot.replied)
|
||||
assert any("3:15" in r for r in bot.replied)
|
||||
|
||||
def test_keep_no_artist(self, tmp_path):
|
||||
"""!keep with empty artist omits the artist field."""
|
||||
bot = _FakeBot()
|
||||
ps = _mod._ps(bot)
|
||||
f = tmp_path / "def456.opus"
|
||||
f.write_bytes(b"audio")
|
||||
track = _mod._Track(
|
||||
url="https://example.com/v", title="Song",
|
||||
requester="a", local_path=f,
|
||||
)
|
||||
ps["current"] = track
|
||||
msg = _Msg(text="!keep")
|
||||
meta = {"title": "Song", "artist": "NA", "duration": 60.0}
|
||||
with patch.object(_mod, "_fetch_metadata", return_value=meta):
|
||||
asyncio.run(_mod.cmd_keep(bot, msg))
|
||||
# Should not contain "NA" as artist
|
||||
assert not any("NA" in r and "--" in r for r in bot.replied)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestKeptMetadata
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestKeptMetadata:
|
||||
def test_kept_shows_metadata(self, tmp_path):
|
||||
"""!kept displays metadata from bot.state when available."""
|
||||
bot = _FakeBot()
|
||||
music_dir = tmp_path / "music"
|
||||
music_dir.mkdir()
|
||||
(music_dir / "abc123.opus").write_bytes(b"x" * 2048)
|
||||
bot.state.set("music", "keep:abc123.opus", json.dumps({
|
||||
"title": "Cool Song", "artist": "DJ Test", "duration": 225.0,
|
||||
}))
|
||||
with patch.object(_mod, "_MUSIC_DIR", music_dir):
|
||||
msg = _Msg(text="!kept")
|
||||
asyncio.run(_mod.cmd_kept(bot, msg))
|
||||
assert any("Cool Song" in r for r in bot.replied)
|
||||
assert any("DJ Test" in r for r in bot.replied)
|
||||
assert any("3:45" in r for r in bot.replied)
|
||||
|
||||
def test_kept_fallback_no_metadata(self, tmp_path):
|
||||
"""!kept falls back to filename when no metadata stored."""
|
||||
bot = _FakeBot()
|
||||
music_dir = tmp_path / "music"
|
||||
music_dir.mkdir()
|
||||
(music_dir / "xyz789.webm").write_bytes(b"x" * 1024)
|
||||
with patch.object(_mod, "_MUSIC_DIR", music_dir):
|
||||
msg = _Msg(text="!kept")
|
||||
asyncio.run(_mod.cmd_kept(bot, msg))
|
||||
assert any("xyz789.webm" in r for r in bot.replied)
|
||||
|
||||
def test_kept_clear_removes_metadata(self, tmp_path):
|
||||
"""!kept clear also removes stored metadata."""
|
||||
bot = _FakeBot()
|
||||
music_dir = tmp_path / "music"
|
||||
music_dir.mkdir()
|
||||
(music_dir / "abc123.opus").write_bytes(b"audio")
|
||||
bot.state.set("music", "keep:abc123.opus", json.dumps({
|
||||
"title": "Song", "artist": "", "duration": 0,
|
||||
}))
|
||||
with patch.object(_mod, "_MUSIC_DIR", music_dir):
|
||||
msg = _Msg(text="!kept clear")
|
||||
asyncio.run(_mod.cmd_kept(bot, msg))
|
||||
assert bot.state.get("music", "keep:abc123.opus") is None
|
||||
assert any("Deleted 1 file(s)" in r for r in bot.replied)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestFetchMetadata
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestFetchMetadata:
|
||||
def test_success(self):
|
||||
result = MagicMock()
|
||||
result.stdout = "My Song\nArtist Name\n195.5\n"
|
||||
with patch("subprocess.run", return_value=result):
|
||||
meta = _mod._fetch_metadata("https://example.com/v")
|
||||
assert meta["title"] == "My Song"
|
||||
assert meta["artist"] == "Artist Name"
|
||||
assert meta["duration"] == 195.5
|
||||
|
||||
def test_partial_output(self):
|
||||
result = MagicMock()
|
||||
result.stdout = "Only Title\n"
|
||||
with patch("subprocess.run", return_value=result):
|
||||
meta = _mod._fetch_metadata("https://example.com/v")
|
||||
assert meta["title"] == "Only Title"
|
||||
assert meta["artist"] == ""
|
||||
assert meta["duration"] == 0
|
||||
|
||||
def test_error_returns_empty(self):
|
||||
with patch("subprocess.run", side_effect=Exception("fail")):
|
||||
meta = _mod._fetch_metadata("https://example.com/v")
|
||||
assert meta["title"] == ""
|
||||
assert meta["artist"] == ""
|
||||
assert meta["duration"] == 0
|
||||
|
||||
Reference in New Issue
Block a user