Files
derp/tests/test_music.py
user 36da191b45 fix: download track on !keep when local file is missing
When the initial download failed during playback and the track streamed
directly from URL, !keep would refuse with "No local file". Now it
downloads the track on the spot before keeping it.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 17:01:44 +01:00

2264 lines
82 KiB
Python

"""Tests for the music playback plugin."""
import asyncio
import importlib.util
import json
import sys
import time
from unittest.mock import AsyncMock, MagicMock, patch
# -- Load plugin module directly ---------------------------------------------
_spec = importlib.util.spec_from_file_location("music", "plugins/music.py")
_mod = importlib.util.module_from_spec(_spec)
sys.modules["music"] = _mod
_spec.loader.exec_module(_mod)
# -- Fakes -------------------------------------------------------------------
class _FakeState:
def __init__(self):
self._store: dict[str, dict[str, str]] = {}
def get(self, ns: str, key: str) -> str | None:
return self._store.get(ns, {}).get(key)
def set(self, ns: str, key: str, value: str) -> None:
self._store.setdefault(ns, {})[key] = value
def delete(self, ns: str, key: str) -> None:
self._store.get(ns, {}).pop(key, None)
def keys(self, ns: str) -> list[str]:
return list(self._store.get(ns, {}).keys())
class _FakeRegistry:
"""Minimal registry with shared voice timestamp."""
def __init__(self):
self._voice_ts: float = 0.0
class _FakeBot:
"""Minimal bot for music plugin testing."""
def __init__(self, *, mumble: bool = True):
self.sent: list[tuple[str, str]] = []
self.replied: list[str] = []
self.state = _FakeState()
self.config: dict = {}
self._pstate: dict = {}
self._tasks: set[asyncio.Task] = set()
self.registry = _FakeRegistry()
if mumble:
self.stream_audio = AsyncMock()
async def send(self, target: str, text: str) -> None:
self.sent.append((target, text))
async def reply(self, message, text: str) -> None:
self.replied.append(text)
async def long_reply(self, message, lines: list[str], *,
label: str = "") -> None:
for line in lines:
self.replied.append(line)
def _is_admin(self, message) -> bool:
return False
def _spawn(self, coro, *, name=None):
task = asyncio.ensure_future(coro)
self._tasks.add(task)
task.add_done_callback(self._tasks.discard)
return task
class _Msg:
"""Minimal message object."""
def __init__(self, text="!play url", nick="Alice", target="0",
is_channel=True):
self.text = text
self.nick = nick
self.target = target
self.is_channel = is_channel
self.prefix = nick
self.command = "PRIVMSG"
self.params = [target, text]
self.tags = {}
self.raw = {}
# ---------------------------------------------------------------------------
# TestMumbleGuard
# ---------------------------------------------------------------------------
class TestMumbleGuard:
def test_is_mumble_true(self):
bot = _FakeBot(mumble=True)
assert _mod._is_mumble(bot) is True
def test_is_mumble_false(self):
bot = _FakeBot(mumble=False)
assert _mod._is_mumble(bot) is False
def test_play_non_mumble(self):
bot = _FakeBot(mumble=False)
msg = _Msg(text="!play https://example.com")
asyncio.run(_mod.cmd_play(bot, msg))
assert any("Mumble-only" in r for r in bot.replied)
def test_stop_non_mumble_silent(self):
bot = _FakeBot(mumble=False)
msg = _Msg(text="!stop")
asyncio.run(_mod.cmd_stop(bot, msg))
assert bot.replied == []
def test_skip_non_mumble_silent(self):
bot = _FakeBot(mumble=False)
msg = _Msg(text="!skip")
asyncio.run(_mod.cmd_skip(bot, msg))
assert bot.replied == []
# ---------------------------------------------------------------------------
# TestPlayCommand
# ---------------------------------------------------------------------------
class TestPlayCommand:
def test_play_no_url(self):
bot = _FakeBot()
msg = _Msg(text="!play")
asyncio.run(_mod.cmd_play(bot, msg))
assert any("Usage" in r for r in bot.replied)
def test_play_queues_track(self):
bot = _FakeBot()
msg = _Msg(text="!play https://example.com/track")
tracks = [("https://example.com/track", "Test Track")]
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
with patch.object(_mod, "_ensure_loop"):
asyncio.run(_mod.cmd_play(bot, msg))
assert any("Playing" in r for r in bot.replied)
ps = _mod._ps(bot)
assert len(ps["queue"]) == 1
assert ps["queue"][0].title == "Test Track"
def test_play_search_query(self):
bot = _FakeBot()
msg = _Msg(text="!play classical music")
tracks = [
("https://youtube.com/watch?v=a", "Result 1"),
("https://youtube.com/watch?v=b", "Result 2"),
("https://youtube.com/watch?v=c", "Result 3"),
]
with patch.object(_mod, "_resolve_tracks", return_value=tracks) as mock_rt:
with patch.object(_mod, "_ensure_loop"):
asyncio.run(_mod.cmd_play(bot, msg))
# Should prepend ytsearch10: for non-URL input
mock_rt.assert_called_once()
assert mock_rt.call_args[0][0] == "ytsearch10:classical music"
# Should pick one random result, not enqueue all
ps = _mod._ps(bot)
assert len(ps["queue"]) == 1
assert any("Playing" in r for r in bot.replied)
def test_play_shows_queued_when_busy(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(
url="x", title="Current", requester="Bob",
)
msg = _Msg(text="!play https://example.com/next")
tracks = [("https://example.com/next", "Next Track")]
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
asyncio.run(_mod.cmd_play(bot, msg))
assert any("Queued" in r for r in bot.replied)
def test_play_queue_full(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["queue"] = [
_mod._Track(url="x", title="t", requester="a")
for _ in range(_mod._MAX_QUEUE)
]
msg = _Msg(text="!play https://example.com/overflow")
asyncio.run(_mod.cmd_play(bot, msg))
assert any("full" in r.lower() for r in bot.replied)
# ---------------------------------------------------------------------------
# TestStopCommand
# ---------------------------------------------------------------------------
class TestStopCommand:
def test_stop_clears_queue(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["queue"] = [_mod._Track(url="x", title="t", requester="a")]
ps["current"] = _mod._Track(url="y", title="s", requester="b")
msg = _Msg(text="!stop")
asyncio.run(_mod.cmd_stop(bot, msg))
assert ps["queue"] == []
assert ps["current"] is None
assert any("Stopped" in r for r in bot.replied)
# ---------------------------------------------------------------------------
# TestSkipCommand
# ---------------------------------------------------------------------------
class TestSkipCommand:
def test_skip_nothing_playing(self):
bot = _FakeBot()
msg = _Msg(text="!skip")
asyncio.run(_mod.cmd_skip(bot, msg))
assert any("Nothing" in r for r in bot.replied)
def test_skip_with_queue(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(url="a", title="First", requester="x")
ps["queue"] = [_mod._Track(url="b", title="Second", requester="y")]
# We need to mock the task
mock_task = MagicMock()
mock_task.done.return_value = False
ps["task"] = mock_task
msg = _Msg(text="!skip")
with patch.object(_mod, "_ensure_loop"):
asyncio.run(_mod.cmd_skip(bot, msg))
assert any("Skipped" in r for r in bot.replied)
mock_task.cancel.assert_called_once()
def test_skip_empty_queue(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(url="a", title="Only", requester="x")
mock_task = MagicMock()
mock_task.done.return_value = False
ps["task"] = mock_task
msg = _Msg(text="!skip")
asyncio.run(_mod.cmd_skip(bot, msg))
assert any("empty" in r.lower() for r in bot.replied)
# ---------------------------------------------------------------------------
# TestQueueCommand
# ---------------------------------------------------------------------------
class TestQueueCommand:
def test_queue_empty(self):
bot = _FakeBot()
msg = _Msg(text="!queue")
asyncio.run(_mod.cmd_queue(bot, msg))
assert any("empty" in r.lower() for r in bot.replied)
def test_queue_with_tracks(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(url="a", title="Now", requester="x")
ps["queue"] = [
_mod._Track(url="b", title="Next", requester="y"),
]
msg = _Msg(text="!queue")
asyncio.run(_mod.cmd_queue(bot, msg))
assert any("Now" in r for r in bot.replied)
assert any("Next" in r for r in bot.replied)
def test_queue_with_url_delegates(self):
bot = _FakeBot()
msg = _Msg(text="!queue https://example.com/track")
tracks = [("https://example.com/track", "Title")]
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
with patch.object(_mod, "_ensure_loop"):
asyncio.run(_mod.cmd_queue(bot, msg))
# Should have called cmd_play logic
assert any("Playing" in r or "Queued" in r for r in bot.replied)
# ---------------------------------------------------------------------------
# TestNpCommand
# ---------------------------------------------------------------------------
class TestNpCommand:
def test_np_nothing(self):
bot = _FakeBot()
msg = _Msg(text="!np")
asyncio.run(_mod.cmd_np(bot, msg))
assert any("Nothing" in r for r in bot.replied)
def test_np_playing(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(
url="x", title="Cool Song", requester="DJ",
)
msg = _Msg(text="!np")
asyncio.run(_mod.cmd_np(bot, msg))
assert any("Cool Song" in r for r in bot.replied)
assert any("DJ" in r for r in bot.replied)
assert any("0:00" in r for r in bot.replied)
def test_np_shows_elapsed(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(
url="x", title="Cool Song", requester="DJ",
)
ps["cur_seek"] = 60.0
ps["progress"] = [1500] # 1500 * 0.02 = 30s
msg = _Msg(text="!np")
asyncio.run(_mod.cmd_np(bot, msg))
assert any("1:30" in r for r in bot.replied)
# ---------------------------------------------------------------------------
# TestVolumeCommand
# ---------------------------------------------------------------------------
class TestVolumeCommand:
def test_volume_show(self):
bot = _FakeBot()
msg = _Msg(text="!volume")
asyncio.run(_mod.cmd_volume(bot, msg))
assert any("50%" in r for r in bot.replied)
def test_volume_set(self):
bot = _FakeBot()
msg = _Msg(text="!volume 75")
asyncio.run(_mod.cmd_volume(bot, msg))
ps = _mod._ps(bot)
assert ps["volume"] == 75
assert any("75%" in r for r in bot.replied)
def test_volume_out_of_range(self):
bot = _FakeBot()
msg = _Msg(text="!volume 150")
asyncio.run(_mod.cmd_volume(bot, msg))
assert any("0-100" in r for r in bot.replied)
def test_volume_negative_absolute(self):
"""Bare negative that underflows clamps at 0-100 error."""
bot = _FakeBot()
_mod._ps(bot)["volume"] = 5
msg = _Msg(text="!volume -10")
asyncio.run(_mod.cmd_volume(bot, msg))
assert any("0-100" in r for r in bot.replied)
def test_volume_relative_up(self):
bot = _FakeBot()
msg = _Msg(text="!volume +15")
asyncio.run(_mod.cmd_volume(bot, msg))
ps = _mod._ps(bot)
assert ps["volume"] == 65
assert any("65%" in r for r in bot.replied)
def test_volume_relative_down(self):
bot = _FakeBot()
_mod._ps(bot)["volume"] = 80
msg = _Msg(text="!volume -20")
asyncio.run(_mod.cmd_volume(bot, msg))
ps = _mod._ps(bot)
assert ps["volume"] == 60
assert any("60%" in r for r in bot.replied)
def test_volume_relative_clamp_over(self):
bot = _FakeBot()
_mod._ps(bot)["volume"] = 95
msg = _Msg(text="!volume +10")
asyncio.run(_mod.cmd_volume(bot, msg))
assert any("0-100" in r for r in bot.replied)
def test_volume_relative_clamp_under(self):
bot = _FakeBot()
_mod._ps(bot)["volume"] = 5
msg = _Msg(text="!volume -10")
asyncio.run(_mod.cmd_volume(bot, msg))
assert any("0-100" in r for r in bot.replied)
def test_volume_invalid(self):
bot = _FakeBot()
msg = _Msg(text="!volume abc")
asyncio.run(_mod.cmd_volume(bot, msg))
assert any("Usage" in r for r in bot.replied)
# ---------------------------------------------------------------------------
# TestPerBotState
# ---------------------------------------------------------------------------
class TestPerBotState:
def test_ps_initializes(self):
bot = _FakeBot()
ps = _mod._ps(bot)
assert ps["queue"] == []
assert ps["current"] is None
assert ps["volume"] == 50
def test_ps_stable_reference(self):
bot = _FakeBot()
ps1 = _mod._ps(bot)
ps2 = _mod._ps(bot)
assert ps1 is ps2
def test_ps_isolated_per_bot(self):
bot1 = _FakeBot()
bot2 = _FakeBot()
_mod._ps(bot1)["volume"] = 80
assert _mod._ps(bot2)["volume"] == 50
# ---------------------------------------------------------------------------
# TestHelpers
# ---------------------------------------------------------------------------
class TestMusicHelpers:
def test_truncate_short(self):
assert _mod._truncate("short") == "short"
def test_truncate_long(self):
long = "x" * 100
result = _mod._truncate(long)
assert len(result) == 80
assert result.endswith("...")
def test_is_url_http(self):
assert _mod._is_url("https://youtube.com/watch?v=abc") is True
def test_is_url_plain_http(self):
assert _mod._is_url("http://example.com") is True
def test_is_url_ytsearch(self):
assert _mod._is_url("ytsearch:classical music") is True
def test_is_url_search_query(self):
assert _mod._is_url("classical music") is False
def test_is_url_single_word(self):
assert _mod._is_url("jazz") is False
# ---------------------------------------------------------------------------
# TestPlaylistExpansion
# ---------------------------------------------------------------------------
class TestPlaylistExpansion:
def test_enqueue_multiple_tracks(self):
bot = _FakeBot()
msg = _Msg(text="!play https://example.com/playlist")
tracks = [
("https://example.com/1", "Track 1"),
("https://example.com/2", "Track 2"),
("https://example.com/3", "Track 3"),
]
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
with patch.object(_mod, "_ensure_loop"):
asyncio.run(_mod.cmd_play(bot, msg))
ps = _mod._ps(bot)
assert len(ps["queue"]) == 3
assert any("Queued 3 tracks" in r for r in bot.replied)
def test_truncate_at_queue_limit(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(url="x", title="Playing", requester="a")
# Fill queue to 2 slots remaining
ps["queue"] = [
_mod._Track(url="x", title="t", requester="a")
for _ in range(_mod._MAX_QUEUE - 2)
]
msg = _Msg(text="!play https://example.com/playlist")
tracks = [
("https://example.com/1", "Track 1"),
("https://example.com/2", "Track 2"),
("https://example.com/3", "Track 3"),
("https://example.com/4", "Track 4"),
]
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
asyncio.run(_mod.cmd_play(bot, msg))
assert len(ps["queue"]) == _mod._MAX_QUEUE
assert any("2 of 4" in r for r in bot.replied)
def test_start_loop_when_idle(self):
bot = _FakeBot()
msg = _Msg(text="!play https://example.com/playlist")
tracks = [
("https://example.com/1", "Track 1"),
("https://example.com/2", "Track 2"),
]
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
with patch.object(_mod, "_ensure_loop") as mock_loop:
asyncio.run(_mod.cmd_play(bot, msg))
mock_loop.assert_called_once()
def test_no_loop_start_when_busy(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(url="x", title="Current", requester="a")
msg = _Msg(text="!play https://example.com/playlist")
tracks = [
("https://example.com/1", "Track 1"),
("https://example.com/2", "Track 2"),
]
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
with patch.object(_mod, "_ensure_loop") as mock_loop:
asyncio.run(_mod.cmd_play(bot, msg))
mock_loop.assert_not_called()
def test_resolve_tracks_single_video(self):
"""Subprocess returning a single url+title pair."""
result = MagicMock()
result.stdout = "https://example.com/v1\nSingle Video\n"
with patch("subprocess.run", return_value=result):
tracks = _mod._resolve_tracks("https://example.com/v1")
assert tracks == [("https://example.com/v1", "Single Video")]
def test_resolve_tracks_na_url_fallback(self):
"""--flat-playlist prints NA for single videos; use original URL."""
result = MagicMock()
result.stdout = "NA\nSingle Video\n"
with patch("subprocess.run", return_value=result):
tracks = _mod._resolve_tracks("https://example.com/v1")
assert tracks == [("https://example.com/v1", "Single Video")]
def test_resolve_tracks_playlist(self):
"""Subprocess returning multiple url+title pairs."""
result = MagicMock()
result.stdout = (
"https://example.com/1\nFirst\n"
"https://example.com/2\nSecond\n"
)
with patch("subprocess.run", return_value=result):
tracks = _mod._resolve_tracks("https://example.com/pl")
assert len(tracks) == 2
assert tracks[0] == ("https://example.com/1", "First")
assert tracks[1] == ("https://example.com/2", "Second")
def test_resolve_tracks_preserves_playlist_url(self):
"""Video+playlist URL passes through to yt-dlp intact."""
result = MagicMock()
result.stdout = (
"https://youtube.com/watch?v=a\nFirst\n"
"https://youtube.com/watch?v=b\nSecond\n"
)
url = "https://www.youtube.com/watch?v=a&list=PLxyz&index=1"
with patch("subprocess.run", return_value=result) as mock_run:
tracks = _mod._resolve_tracks(url)
# URL must reach yt-dlp with &list= intact
called_url = mock_run.call_args[0][0][-1]
assert "list=PLxyz" in called_url
assert len(tracks) == 2
def test_random_fragment_shuffles(self):
"""#random fragment shuffles resolved playlist tracks."""
bot = _FakeBot()
msg = _Msg(text="!play https://example.com/playlist#random")
tracks = [(f"https://example.com/{i}", f"Track {i}") for i in range(20)]
with patch.object(_mod, "_resolve_tracks", return_value=list(tracks)) as mock_rt:
with patch.object(_mod, "_ensure_loop"):
asyncio.run(_mod.cmd_play(bot, msg))
# Fragment stripped before passing to resolver
called_url = mock_rt.call_args[0][0]
assert "#random" not in called_url
ps = _mod._ps(bot)
assert len(ps["queue"]) == 20
# Extremely unlikely (1/20!) that shuffle preserves exact order
titles = [t.title for t in ps["queue"]]
assert titles != [f"Track {i}" for i in range(20)] or len(titles) == 1
# Announces shuffle
assert any("shuffled" in r for r in bot.replied)
def test_random_fragment_single_track_no_error(self):
"""#random on a single-video URL works fine (nothing to shuffle)."""
bot = _FakeBot()
msg = _Msg(text="!play https://example.com/video#random")
tracks = [("https://example.com/video", "Solo Track")]
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
with patch.object(_mod, "_ensure_loop"):
asyncio.run(_mod.cmd_play(bot, msg))
ps = _mod._ps(bot)
assert len(ps["queue"]) == 1
assert ps["queue"][0].title == "Solo Track"
def test_random_fragment_ignored_for_search(self):
"""#random is not treated specially for search queries."""
bot = _FakeBot()
msg = _Msg(text="!play jazz #random")
tracks = [("https://example.com/1", "Result")]
with patch.object(_mod, "_resolve_tracks", return_value=tracks) as mock_rt:
with patch.object(_mod, "_ensure_loop"):
asyncio.run(_mod.cmd_play(bot, msg))
# Search query passed as-is (not a URL, fragment not stripped)
assert mock_rt.call_args[0][0] == "ytsearch10:jazz #random"
def test_resolve_tracks_error_fallback(self):
"""On error, returns [(url, url)]."""
with patch("subprocess.run", side_effect=Exception("fail")):
tracks = _mod._resolve_tracks("https://example.com/bad")
assert tracks == [("https://example.com/bad", "https://example.com/bad")]
def test_resolve_tracks_empty_output(self):
"""Empty stdout returns fallback."""
result = MagicMock()
result.stdout = ""
with patch("subprocess.run", return_value=result):
tracks = _mod._resolve_tracks("https://example.com/empty")
assert tracks == [("https://example.com/empty", "https://example.com/empty")]
def test_resolve_tracks_start_param(self):
"""start= passes --playlist-start to yt-dlp."""
result = MagicMock()
result.stdout = "https://example.com/6\nTrack 6\n"
with patch("subprocess.run", return_value=result) as mock_run:
tracks = _mod._resolve_tracks("https://example.com/pl",
max_tracks=5, start=6)
cmd = mock_run.call_args[0][0]
assert "--playlist-start=6" in cmd
assert "--playlist-end=10" in cmd
assert tracks == [("https://example.com/6", "Track 6")]
def test_resolve_tracks_start_empty_returns_empty(self):
"""Paginated call with no results returns [] (not fallback)."""
result = MagicMock()
result.stdout = ""
with patch("subprocess.run", return_value=result):
tracks = _mod._resolve_tracks("https://example.com/pl",
start=100)
assert tracks == []
def test_resolve_tracks_start_error_returns_empty(self):
"""Paginated call on error returns [] (not fallback)."""
with patch("subprocess.run", side_effect=Exception("fail")):
tracks = _mod._resolve_tracks("https://example.com/pl",
start=10)
assert tracks == []
def test_playlist_url_triggers_batched_resolve(self):
"""Playlist URL resolves initial batch, spawns feeder for rest."""
bot = _FakeBot()
batch = _mod._PLAYLIST_BATCH
initial = [(f"https://example.com/{i}", f"T{i}")
for i in range(batch)]
spawned = []
orig_spawn = bot._spawn
def spy_spawn(coro, *, name=None):
spawned.append(name)
return orig_spawn(coro, name=name)
bot._spawn = spy_spawn
msg = _Msg(text="!play https://example.com/watch?v=a&list=PLxyz")
with patch.object(_mod, "_resolve_tracks", return_value=initial):
with patch.object(_mod, "_ensure_loop"):
asyncio.run(_mod.cmd_play(bot, msg))
ps = _mod._ps(bot)
assert len(ps["queue"]) == batch
assert "music-playlist-feeder" in spawned
assert any("resolving more" in r.lower() for r in bot.replied)
def test_non_playlist_url_no_feeder(self):
"""Single video URL does not spawn background feeder."""
bot = _FakeBot()
spawned = []
orig_spawn = bot._spawn
def spy_spawn(coro, *, name=None):
spawned.append(name)
return orig_spawn(coro, name=name)
bot._spawn = spy_spawn
tracks = [("https://example.com/v", "Video")]
msg = _Msg(text="!play https://example.com/v")
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
with patch.object(_mod, "_ensure_loop"):
asyncio.run(_mod.cmd_play(bot, msg))
assert "music-playlist-feeder" not in spawned
def test_playlist_feeder_appends_to_queue(self):
"""Background feeder resolves remaining tracks into queue."""
bot = _FakeBot()
ps = _mod._ps(bot)
remaining = [("https://example.com/6", "Track 6"),
("https://example.com/7", "Track 7")]
async def _check():
with patch.object(_mod, "_resolve_tracks",
return_value=remaining):
await _mod._playlist_feeder(
bot, "https://example.com/pl", 6, 10,
False, "Alice", "https://example.com/pl",
)
assert len(ps["queue"]) == 2
assert ps["queue"][0].title == "Track 6"
assert ps["queue"][1].requester == "Alice"
asyncio.run(_check())
def test_playlist_feeder_shuffles(self):
"""Background feeder shuffles when shuffle=True."""
bot = _FakeBot()
ps = _mod._ps(bot)
remaining = [(f"https://example.com/{i}", f"T{i}")
for i in range(20)]
async def _check():
with patch.object(_mod, "_resolve_tracks",
return_value=list(remaining)):
await _mod._playlist_feeder(
bot, "https://example.com/pl", 6, 20,
True, "Alice", "",
)
titles = [t.title for t in ps["queue"]]
assert len(titles) == 20
# Extremely unlikely shuffle preserves order
assert titles != [f"T{i}" for i in range(20)]
asyncio.run(_check())
def test_playlist_feeder_respects_queue_cap(self):
"""Background feeder stops at _MAX_QUEUE."""
bot = _FakeBot()
ps = _mod._ps(bot)
# Pre-fill queue to near capacity
ps["queue"] = [_mod._Track(url="x", title="t", requester="a")
for _ in range(_mod._MAX_QUEUE - 2)]
remaining = [(f"https://example.com/{i}", f"T{i}")
for i in range(10)]
async def _check():
with patch.object(_mod, "_resolve_tracks",
return_value=remaining):
await _mod._playlist_feeder(
bot, "url", 6, 10, False, "a", "",
)
assert len(ps["queue"]) == _mod._MAX_QUEUE
asyncio.run(_check())
# ---------------------------------------------------------------------------
# TestResumeState
# ---------------------------------------------------------------------------
class TestResumeState:
def test_save_load_roundtrip(self):
bot = _FakeBot()
track = _mod._Track(url="https://example.com/a", title="Song", requester="Alice")
_mod._save_resume(bot, track, 125.5)
data = _mod._load_resume(bot)
assert data is not None
assert data["url"] == "https://example.com/a"
assert data["title"] == "Song"
assert data["requester"] == "Alice"
assert data["elapsed"] == 125.5
def test_clear_removes_state(self):
bot = _FakeBot()
track = _mod._Track(url="https://example.com/a", title="Song", requester="Alice")
_mod._save_resume(bot, track, 60.0)
_mod._clear_resume(bot)
assert _mod._load_resume(bot) is None
def test_load_returns_none_when_empty(self):
bot = _FakeBot()
assert _mod._load_resume(bot) is None
def test_load_returns_none_on_corrupt_json(self):
bot = _FakeBot()
bot.state.set("music", "resume", "not-json{{{")
assert _mod._load_resume(bot) is None
def test_load_returns_none_on_missing_url(self):
bot = _FakeBot()
bot.state.set("music", "resume", '{"title": "x"}')
assert _mod._load_resume(bot) is None
def test_save_strips_youtube_playlist_params(self):
"""_save_resume strips &list= and other playlist params from YouTube URLs."""
bot = _FakeBot()
track = _mod._Track(
url="https://www.youtube.com/watch?v=abc123&list=RDabc123&start_radio=1&pp=xyz",
title="Song", requester="Alice",
)
_mod._save_resume(bot, track, 60.0)
data = _mod._load_resume(bot)
assert data is not None
assert data["url"] == "https://www.youtube.com/watch?v=abc123"
def test_save_preserves_non_youtube_urls(self):
"""_save_resume leaves non-YouTube URLs unchanged."""
bot = _FakeBot()
track = _mod._Track(
url="https://soundcloud.com/artist/track?ref=playlist",
title="Song", requester="Alice",
)
_mod._save_resume(bot, track, 30.0)
data = _mod._load_resume(bot)
assert data["url"] == "https://soundcloud.com/artist/track?ref=playlist"
# ---------------------------------------------------------------------------
# TestStripPlaylistParams
# ---------------------------------------------------------------------------
class TestStripPlaylistParams:
def test_strips_list_param(self):
url = "https://www.youtube.com/watch?v=abc&list=PLxyz&index=3"
assert _mod._strip_playlist_params(url) == "https://www.youtube.com/watch?v=abc"
def test_strips_radio_params(self):
url = "https://www.youtube.com/watch?v=abc&list=RDabc&start_radio=1&pp=xyz"
assert _mod._strip_playlist_params(url) == "https://www.youtube.com/watch?v=abc"
def test_preserves_plain_url(self):
url = "https://www.youtube.com/watch?v=abc123"
assert _mod._strip_playlist_params(url) == "https://www.youtube.com/watch?v=abc123"
def test_non_youtube_unchanged(self):
url = "https://soundcloud.com/track?list=abc"
assert _mod._strip_playlist_params(url) == url
def test_youtu_be_without_v_param(self):
url = "https://youtu.be/abc123"
assert _mod._strip_playlist_params(url) == url
# ---------------------------------------------------------------------------
# TestResumeCommand
# ---------------------------------------------------------------------------
class TestResumeCommand:
def test_nothing_saved(self):
bot = _FakeBot()
msg = _Msg(text="!resume")
asyncio.run(_mod.cmd_resume(bot, msg))
assert any("Nothing to resume" in r for r in bot.replied)
def test_already_playing(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(url="x", title="Playing", requester="a")
msg = _Msg(text="!resume")
asyncio.run(_mod.cmd_resume(bot, msg))
assert any("Already playing" in r for r in bot.replied)
def test_non_mumble(self):
bot = _FakeBot(mumble=False)
msg = _Msg(text="!resume")
asyncio.run(_mod.cmd_resume(bot, msg))
assert any("Mumble-only" in r for r in bot.replied)
def test_loads_track_and_seeks(self):
bot = _FakeBot()
track = _mod._Track(url="https://example.com/a", title="Song", requester="Alice")
_mod._save_resume(bot, track, 225.0)
msg = _Msg(text="!resume")
with patch.object(_mod, "_ensure_loop") as mock_loop:
asyncio.run(_mod.cmd_resume(bot, msg))
mock_loop.assert_called_once_with(bot, seek=225.0)
ps = _mod._ps(bot)
assert len(ps["queue"]) == 1
assert ps["queue"][0].url == "https://example.com/a"
assert any("Resuming" in r for r in bot.replied)
def test_time_format_in_reply(self):
bot = _FakeBot()
track = _mod._Track(url="https://example.com/a", title="Song", requester="Alice")
_mod._save_resume(bot, track, 225.0)
msg = _Msg(text="!resume")
with patch.object(_mod, "_ensure_loop"):
asyncio.run(_mod.cmd_resume(bot, msg))
assert any("3:45" in r for r in bot.replied)
def test_clears_resume_state_after_loading(self):
bot = _FakeBot()
track = _mod._Track(url="https://example.com/a", title="Song", requester="Alice")
_mod._save_resume(bot, track, 60.0)
msg = _Msg(text="!resume")
with patch.object(_mod, "_ensure_loop"):
asyncio.run(_mod.cmd_resume(bot, msg))
assert _mod._load_resume(bot) is None
# ---------------------------------------------------------------------------
# TestFmtTime
# ---------------------------------------------------------------------------
class TestFmtTime:
def test_zero(self):
assert _mod._fmt_time(0) == "0:00"
def test_seconds_only(self):
assert _mod._fmt_time(45) == "0:45"
def test_minutes_and_seconds(self):
assert _mod._fmt_time(225) == "3:45"
def test_large_value(self):
assert _mod._fmt_time(3661) == "61:01"
# ---------------------------------------------------------------------------
# TestDuckCommand
# ---------------------------------------------------------------------------
class TestDuckCommand:
def test_show_status(self):
bot = _FakeBot()
msg = _Msg(text="!duck")
asyncio.run(_mod.cmd_duck(bot, msg))
assert any("Duck:" in r for r in bot.replied)
assert any("floor=1%" in r for r in bot.replied)
assert any("restore=30s" in r for r in bot.replied)
def test_toggle_on(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["duck_enabled"] = False
msg = _Msg(text="!duck on")
asyncio.run(_mod.cmd_duck(bot, msg))
assert ps["duck_enabled"] is True
assert any("enabled" in r for r in bot.replied)
def test_toggle_off(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["duck_enabled"] = True
ps["duck_vol"] = 5.0
msg = _Msg(text="!duck off")
asyncio.run(_mod.cmd_duck(bot, msg))
assert ps["duck_enabled"] is False
assert ps["duck_vol"] is None
assert any("disabled" in r for r in bot.replied)
def test_set_floor(self):
bot = _FakeBot()
msg = _Msg(text="!duck floor 10")
asyncio.run(_mod.cmd_duck(bot, msg))
ps = _mod._ps(bot)
assert ps["duck_floor"] == 10
assert any("10%" in r for r in bot.replied)
def test_set_floor_invalid(self):
bot = _FakeBot()
msg = _Msg(text="!duck floor 200")
asyncio.run(_mod.cmd_duck(bot, msg))
assert any("0-100" in r for r in bot.replied)
def test_set_silence(self):
bot = _FakeBot()
msg = _Msg(text="!duck silence 30")
asyncio.run(_mod.cmd_duck(bot, msg))
ps = _mod._ps(bot)
assert ps["duck_silence"] == 30
assert any("30s" in r for r in bot.replied)
def test_set_restore(self):
bot = _FakeBot()
msg = _Msg(text="!duck restore 45")
asyncio.run(_mod.cmd_duck(bot, msg))
ps = _mod._ps(bot)
assert ps["duck_restore"] == 45
assert any("45s" in r for r in bot.replied)
def test_non_mumble(self):
bot = _FakeBot(mumble=False)
msg = _Msg(text="!duck")
asyncio.run(_mod.cmd_duck(bot, msg))
assert any("Mumble-only" in r for r in bot.replied)
# ---------------------------------------------------------------------------
# TestDuckMonitor
# ---------------------------------------------------------------------------
class TestDuckMonitor:
def test_voice_detected_ducks_to_floor(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["duck_enabled"] = True
ps["duck_floor"] = 5
bot.registry._voice_ts = time.monotonic()
async def _check():
task = asyncio.create_task(_mod._duck_monitor(bot))
await asyncio.sleep(1.5)
assert ps["duck_vol"] == 5.0
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_check())
def test_silence_begins_smooth_restore(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["duck_enabled"] = True
ps["duck_floor"] = 1
ps["duck_restore"] = 10 # 10s total restore
ps["volume"] = 50
bot.registry._voice_ts = time.monotonic() - 100
ps["duck_vol"] = 1.0 # already ducked
async def _check():
task = asyncio.create_task(_mod._duck_monitor(bot))
await asyncio.sleep(1.5)
# After ~1s into a 10s ramp from 1->50, vol should be ~5-6
vol = ps["duck_vol"]
assert vol is not None and vol > 1.0
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_check())
def test_full_restore_sets_none(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["duck_enabled"] = True
ps["duck_floor"] = 1
ps["duck_restore"] = 1 # 1s restore -- completes quickly
ps["volume"] = 50
bot.registry._voice_ts = time.monotonic() - 100
ps["duck_vol"] = 1.0
async def _check():
task = asyncio.create_task(_mod._duck_monitor(bot))
# First tick starts restore, second tick sees elapsed >= dur
await asyncio.sleep(2.5)
assert ps["duck_vol"] is None
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_check())
def test_reduck_during_restore(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["duck_enabled"] = True
ps["duck_floor"] = 5
ps["duck_restore"] = 30
ps["volume"] = 50
bot.registry._voice_ts = time.monotonic() - 100
ps["duck_vol"] = 30.0 # mid-restore
async def _check():
task = asyncio.create_task(_mod._duck_monitor(bot))
await asyncio.sleep(0.5)
# Simulate voice arriving now
bot.registry._voice_ts = time.monotonic()
await asyncio.sleep(1.5)
assert ps["duck_vol"] == 5.0 # re-ducked to floor
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_check())
def test_disabled_no_ducking(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["duck_enabled"] = False
bot.registry._voice_ts = time.monotonic()
async def _check():
task = asyncio.create_task(_mod._duck_monitor(bot))
await asyncio.sleep(1.5)
assert ps["duck_vol"] is None
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_check())
def test_tts_active_ducks(self):
"""TTS activity from voice peer triggers ducking."""
bot = _FakeBot()
ps = _mod._ps(bot)
ps["duck_enabled"] = True
ps["duck_floor"] = 5
ps["duck_restore"] = 1 # fast restore for test
bot.registry._voice_ts = 0.0
bot.registry._tts_active = True
async def _check():
task = asyncio.create_task(_mod._duck_monitor(bot))
await asyncio.sleep(1.5)
assert ps["duck_vol"] == 5.0
# TTS ends -- restore should begin and complete quickly
bot.registry._tts_active = False
await asyncio.sleep(2.5)
assert ps["duck_vol"] is None
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_check())
def test_tts_active_overrides_all_muted(self):
"""TTS ducks even when all users are muted."""
bot = _FakeBot()
ps = _mod._ps(bot)
ps["duck_enabled"] = True
ps["duck_floor"] = 5
bot.registry._voice_ts = time.monotonic()
bot.registry._tts_active = True
# Simulate all users muted
bot._mumble = MagicMock()
bot._mumble.users = {1: {"name": "human", "self_mute": True,
"mute": False, "self_deaf": False}}
bot.registry._bots = {}
async def _check():
task = asyncio.create_task(_mod._duck_monitor(bot))
await asyncio.sleep(1.5)
assert ps["duck_vol"] == 5.0
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_check())
# ---------------------------------------------------------------------------
# TestAutoResume
# ---------------------------------------------------------------------------
class TestAutoResume:
def test_resume_on_silence(self):
"""Auto-resume loads saved state when channel is silent."""
bot = _FakeBot()
bot._connect_count = 2
bot.registry._voice_ts = 0.0
track = _mod._Track(url="https://example.com/a", title="Song", requester="Alice")
_mod._save_resume(bot, track, 120.0)
with patch.object(_mod, "_ensure_loop") as mock_loop:
asyncio.run(_mod._auto_resume(bot))
mock_loop.assert_called_once_with(bot, seek=120.0)
ps = _mod._ps(bot)
assert len(ps["queue"]) == 1
assert ps["queue"][0].url == "https://example.com/a"
# Resume state cleared after loading
assert _mod._load_resume(bot) is None
def test_no_resume_if_playing(self):
"""Auto-resume returns early when already playing."""
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(url="x", title="Playing", requester="a")
track = _mod._Track(url="https://example.com/a", title="Song", requester="Alice")
_mod._save_resume(bot, track, 60.0)
with patch.object(_mod, "_ensure_loop") as mock_loop:
asyncio.run(_mod._auto_resume(bot))
mock_loop.assert_not_called()
def test_no_resume_if_no_state(self):
"""Auto-resume returns early when nothing is saved."""
bot = _FakeBot()
bot.registry._voice_ts = 0.0
with patch.object(_mod, "_ensure_loop") as mock_loop:
asyncio.run(_mod._auto_resume(bot))
mock_loop.assert_not_called()
def test_abort_if_voice_active(self):
"""Auto-resume aborts if voice never goes silent within deadline."""
bot = _FakeBot()
now = time.monotonic()
bot.registry._voice_ts = now
ps = _mod._ps(bot)
ps["duck_silence"] = 15
track = _mod._Track(url="https://example.com/a", title="Song", requester="Alice")
_mod._save_resume(bot, track, 60.0)
async def _check():
# Patch monotonic to jump past the 60s deadline; keep voice active
mono_val = [now]
_real_sleep = asyncio.sleep
def _fast_mono():
return mono_val[0]
async def _fast_sleep(s):
mono_val[0] += s
bot.registry._voice_ts = mono_val[0]
await _real_sleep(0)
with patch.object(time, "monotonic", side_effect=_fast_mono):
with patch("asyncio.sleep", side_effect=_fast_sleep):
with patch.object(_mod, "_ensure_loop") as mock_loop:
await _mod._auto_resume(bot)
mock_loop.assert_not_called()
asyncio.run(_check())
def test_reconnect_watcher_triggers_resume(self):
"""Watcher detects connect_count increment and calls _auto_resume."""
bot = _FakeBot()
bot._connect_count = 1
# Resume state must exist for watcher to call _auto_resume
track = _mod._Track(url="https://example.com/a", title="Song", requester="Alice")
_mod._save_resume(bot, track, 60.0)
async def _check():
with patch.object(_mod, "_auto_resume", new_callable=AsyncMock) as mock_ar:
task = asyncio.create_task(_mod._reconnect_watcher(bot))
await asyncio.sleep(0.5)
# Simulate reconnection
bot._connect_count = 2
await asyncio.sleep(3)
mock_ar.assert_called_once_with(bot)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_check())
def test_watcher_ignores_first_connect(self):
"""Watcher does not trigger on initial connection (count 0->1) without saved state."""
bot = _FakeBot()
bot._connect_count = 0
async def _check():
with patch.object(_mod, "_auto_resume", new_callable=AsyncMock) as mock_ar:
task = asyncio.create_task(_mod._reconnect_watcher(bot))
await asyncio.sleep(0.5)
bot._connect_count = 1
await asyncio.sleep(3)
mock_ar.assert_not_called()
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_check())
def test_watcher_boot_resume_with_saved_state(self):
"""Watcher triggers boot-resume on first connect when state exists."""
bot = _FakeBot()
bot._connect_count = 0
track = _mod._Track(url="https://example.com/a", title="Song", requester="Alice")
_mod._save_resume(bot, track, 30.0)
async def _check():
with patch.object(_mod, "_auto_resume", new_callable=AsyncMock) as mock_ar:
task = asyncio.create_task(_mod._reconnect_watcher(bot))
await asyncio.sleep(0.5)
bot._connect_count = 1
await asyncio.sleep(3)
mock_ar.assert_called_once_with(bot)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_check())
def test_on_connected_starts_watcher(self):
"""on_connected() starts the reconnect watcher task."""
bot = _FakeBot()
spawned = []
def fake_spawn(coro, *, name=None):
task = MagicMock()
task.done.return_value = False
spawned.append(name)
# Close the coroutine to avoid RuntimeWarning
coro.close()
return task
bot._spawn = fake_spawn
asyncio.run(_mod.on_connected(bot))
assert "music-reconnect-watcher" in spawned
ps = _mod._ps(bot)
assert ps["_watcher_task"] is not None
def test_on_connected_no_double_start(self):
"""on_connected() does not start a second watcher."""
bot = _FakeBot()
spawned = []
def fake_spawn(coro, *, name=None):
task = MagicMock()
task.done.return_value = False
spawned.append(name)
coro.close()
return task
bot._spawn = fake_spawn
asyncio.run(_mod.on_connected(bot))
asyncio.run(_mod.on_connected(bot))
assert spawned.count("music-reconnect-watcher") == 1
# ---------------------------------------------------------------------------
# TestAutoplayKept
# ---------------------------------------------------------------------------
class TestAutoplayKept:
def test_starts_loop_with_kept_tracks(self, tmp_path):
"""Autoplay starts play loop when kept tracks exist."""
bot = _FakeBot()
bot.registry._voice_ts = 0.0
music_dir = tmp_path / "music"
music_dir.mkdir()
(music_dir / "a.opus").write_bytes(b"audio")
bot.state.set("music", "keep:1", json.dumps({
"url": "https://example.com/a", "title": "Track A",
"filename": "a.opus", "id": 1,
}))
with patch.object(_mod, "_MUSIC_DIR", music_dir), \
patch.object(_mod, "_ensure_loop") as mock_loop:
asyncio.run(_mod._autoplay_kept(bot))
mock_loop.assert_called_once_with(bot)
def test_skips_when_already_playing(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(url="x", title="Playing", requester="a")
with patch.object(_mod, "_ensure_loop") as mock_loop:
asyncio.run(_mod._autoplay_kept(bot))
mock_loop.assert_not_called()
def test_skips_when_no_kept_tracks(self):
bot = _FakeBot()
bot.registry._voice_ts = 0.0
with patch.object(_mod, "_ensure_loop") as mock_loop:
asyncio.run(_mod._autoplay_kept(bot))
mock_loop.assert_not_called()
def test_load_kept_tracks_skips_missing_files(self, tmp_path):
"""Tracks with missing local files are excluded."""
bot = _FakeBot()
music_dir = tmp_path / "music"
music_dir.mkdir()
bot.state.set("music", "keep:1", json.dumps({
"url": "https://example.com/a", "title": "Gone",
"filename": "missing.opus", "id": 1,
}))
with patch.object(_mod, "_MUSIC_DIR", music_dir):
tracks = _mod._load_kept_tracks(bot)
assert tracks == []
def test_watcher_autoplay_on_boot_no_resume(self):
"""Watcher triggers autoplay on boot when no resume state exists."""
bot = _FakeBot()
bot._connect_count = 0
async def _check():
with patch.object(_mod, "_autoplay_kept",
new_callable=AsyncMock) as mock_ap:
task = asyncio.create_task(_mod._reconnect_watcher(bot))
await asyncio.sleep(0.5)
bot._connect_count = 1
await asyncio.sleep(3)
mock_ap.assert_called_once_with(bot)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_check())
def test_watcher_autoplay_on_reconnect_no_resume(self):
"""Watcher triggers autoplay on reconnect when no resume state."""
bot = _FakeBot()
bot._connect_count = 1
async def _check():
with patch.object(_mod, "_autoplay_kept",
new_callable=AsyncMock) as mock_ap:
task = asyncio.create_task(_mod._reconnect_watcher(bot))
await asyncio.sleep(0.5)
bot._connect_count = 2
await asyncio.sleep(3)
mock_ap.assert_called_once_with(bot)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_check())
# ---------------------------------------------------------------------------
# TestDownloadTrack
# ---------------------------------------------------------------------------
class TestDownloadTrack:
def test_download_success(self, tmp_path):
"""Successful download returns a Path."""
music_dir = tmp_path / "music"
with patch.object(_mod, "_MUSIC_DIR", music_dir):
result = MagicMock()
result.stdout = str(music_dir / "abc123.opus") + "\n"
result.returncode = 0
# Create the file so is_file() returns True
music_dir.mkdir(parents=True)
(music_dir / "abc123.opus").write_bytes(b"audio")
with patch("subprocess.run", return_value=result):
path = _mod._download_track("https://example.com/v", "abc123")
assert path is not None
assert path.name == "abc123.opus"
def test_download_fallback_glob(self, tmp_path):
"""Falls back to glob when --print output is empty."""
music_dir = tmp_path / "music"
with patch.object(_mod, "_MUSIC_DIR", music_dir):
result = MagicMock()
result.stdout = ""
result.returncode = 0
music_dir.mkdir(parents=True)
(music_dir / "abc123.webm").write_bytes(b"audio")
with patch("subprocess.run", return_value=result):
path = _mod._download_track("https://example.com/v", "abc123")
assert path is not None
assert path.name == "abc123.webm"
def test_download_failure_returns_none(self, tmp_path):
"""Exception during download returns None."""
music_dir = tmp_path / "music"
with patch.object(_mod, "_MUSIC_DIR", music_dir):
with patch("subprocess.run", side_effect=Exception("fail")):
path = _mod._download_track("https://example.com/v", "abc123")
assert path is None
def test_download_no_file_returns_none(self, tmp_path):
"""No matching file on disk returns None."""
music_dir = tmp_path / "music"
with patch.object(_mod, "_MUSIC_DIR", music_dir):
result = MagicMock()
result.stdout = "/nonexistent/path.opus\n"
result.returncode = 0
music_dir.mkdir(parents=True)
with patch("subprocess.run", return_value=result):
path = _mod._download_track("https://example.com/v", "abc123")
assert path is None
# ---------------------------------------------------------------------------
# TestCleanupTrack
# ---------------------------------------------------------------------------
class TestCleanupTrack:
def test_cleanup_deletes_file(self, tmp_path):
"""Cleanup deletes the local file when keep=False."""
f = tmp_path / "test.opus"
f.write_bytes(b"audio")
track = _mod._Track(
url="x", title="t", requester="a",
local_path=f, keep=False,
)
_mod._cleanup_track(track)
assert not f.exists()
def test_cleanup_keeps_file_when_flagged(self, tmp_path):
"""Cleanup preserves the file when keep=True."""
f = tmp_path / "test.opus"
f.write_bytes(b"audio")
track = _mod._Track(
url="x", title="t", requester="a",
local_path=f, keep=True,
)
_mod._cleanup_track(track)
assert f.exists()
def test_cleanup_noop_when_no_path(self):
"""Cleanup does nothing when local_path is None."""
track = _mod._Track(url="x", title="t", requester="a")
_mod._cleanup_track(track) # should not raise
# ---------------------------------------------------------------------------
# TestKeepCommand
# ---------------------------------------------------------------------------
class TestKeepCommand:
def test_keep_nothing_playing(self):
bot = _FakeBot()
msg = _Msg(text="!keep")
asyncio.run(_mod.cmd_keep(bot, msg))
assert any("Nothing playing" in r for r in bot.replied)
def test_keep_no_local_file_no_url(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(url="", title="t", requester="a")
msg = _Msg(text="!keep")
asyncio.run(_mod.cmd_keep(bot, msg))
assert any("No local file" in r for r in bot.replied)
def test_keep_downloads_when_no_local_file(self, tmp_path):
bot = _FakeBot()
ps = _mod._ps(bot)
track = _mod._Track(url="https://example.com/v", title="t",
requester="a")
ps["current"] = track
dl_file = tmp_path / "abc.opus"
dl_file.write_bytes(b"audio")
music_dir = tmp_path / "kept"
music_dir.mkdir()
meta = {"title": "t", "artist": "", "duration": 0}
msg = _Msg(text="!keep")
with patch.object(_mod, "_download_track", return_value=dl_file), \
patch.object(_mod, "_MUSIC_DIR", music_dir), \
patch.object(_mod, "_fetch_metadata", return_value=meta):
asyncio.run(_mod.cmd_keep(bot, msg))
assert track.keep is True
assert track.local_path is not None
assert any("Keeping" in r for r in bot.replied)
def test_keep_download_failure(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(url="https://example.com/v", title="t",
requester="a")
msg = _Msg(text="!keep")
with patch.object(_mod, "_download_track", return_value=None):
asyncio.run(_mod.cmd_keep(bot, msg))
assert any("Download failed" in r for r in bot.replied)
def test_keep_marks_track(self, tmp_path):
bot = _FakeBot()
ps = _mod._ps(bot)
f = tmp_path / "abc123.opus"
f.write_bytes(b"audio")
track = _mod._Track(
url="x", title="t", requester="a", local_path=f,
)
ps["current"] = track
msg = _Msg(text="!keep")
music_dir = tmp_path / "kept"
music_dir.mkdir()
meta = {"title": "t", "artist": "", "duration": 0}
with patch.object(_mod, "_MUSIC_DIR", music_dir), \
patch.object(_mod, "_fetch_metadata", return_value=meta):
asyncio.run(_mod.cmd_keep(bot, msg))
assert track.keep is True
assert any("Keeping" in r for r in bot.replied)
def test_keep_duplicate_blocked(self, tmp_path):
bot = _FakeBot()
ps = _mod._ps(bot)
f = tmp_path / "abc123.opus"
f.write_bytes(b"audio")
track = _mod._Track(
url="https://example.com/v", title="t", requester="a",
local_path=f,
)
ps["current"] = track
# Pre-existing kept entry with same URL
bot.state.set("music", "keep:1", json.dumps({
"url": "https://example.com/v", "id": 1,
}))
bot.state.set("music", "keep_next_id", "2")
msg = _Msg(text="!keep")
asyncio.run(_mod.cmd_keep(bot, msg))
assert any("Already kept" in r for r in bot.replied)
assert any("#1" in r for r in bot.replied)
# ID counter should not have incremented
assert bot.state.get("music", "keep_next_id") == "2"
def test_keep_duplicate_with_playlist_params(self, tmp_path):
bot = _FakeBot()
ps = _mod._ps(bot)
f = tmp_path / "abc123.opus"
f.write_bytes(b"audio")
# Track URL has playlist cruft
track = _mod._Track(
url="https://www.youtube.com/watch?v=abc&list=RDabc&start_radio=1",
title="t", requester="a", local_path=f,
)
ps["current"] = track
# Existing entry stored with clean URL
bot.state.set("music", "keep:1", json.dumps({
"url": "https://www.youtube.com/watch?v=abc", "id": 1,
}))
bot.state.set("music", "keep_next_id", "2")
msg = _Msg(text="!keep")
asyncio.run(_mod.cmd_keep(bot, msg))
assert any("Already kept" in r for r in bot.replied)
assert bot.state.get("music", "keep_next_id") == "2"
def test_keep_non_mumble(self):
bot = _FakeBot(mumble=False)
msg = _Msg(text="!keep")
asyncio.run(_mod.cmd_keep(bot, msg))
assert any("Mumble-only" in r for r in bot.replied)
# ---------------------------------------------------------------------------
# TestKeptCommand
# ---------------------------------------------------------------------------
class TestKeptCommand:
def test_kept_empty(self):
bot = _FakeBot()
msg = _Msg(text="!kept")
asyncio.run(_mod.cmd_kept(bot, msg))
assert any("No kept tracks" in r for r in bot.replied)
def test_kept_lists_tracks(self, tmp_path):
bot = _FakeBot()
music_dir = tmp_path / "music"
music_dir.mkdir()
(music_dir / "abc123.opus").write_bytes(b"x" * 1024)
bot.state.set("music", "keep:1", json.dumps({
"title": "Test Track", "artist": "", "duration": 0,
"filename": "abc123.opus", "id": 1,
}))
with patch.object(_mod, "_MUSIC_DIR", music_dir):
msg = _Msg(text="!kept")
asyncio.run(_mod.cmd_kept(bot, msg))
assert any("Kept tracks" in r for r in bot.replied)
assert any("#1" in r for r in bot.replied)
assert any("Test Track" in r for r in bot.replied)
def test_kept_clear(self, tmp_path):
bot = _FakeBot()
music_dir = tmp_path / "music"
music_dir.mkdir()
(music_dir / "abc123.opus").write_bytes(b"audio")
(music_dir / "def456.webm").write_bytes(b"audio")
bot.state.set("music", "keep:1", json.dumps({"id": 1}))
with patch.object(_mod, "_MUSIC_DIR", music_dir):
msg = _Msg(text="!kept clear")
asyncio.run(_mod.cmd_kept(bot, msg))
assert any("Deleted 2 file(s)" in r for r in bot.replied)
assert not list(music_dir.iterdir())
assert bot.state.get("music", "keep:1") is None
def test_kept_shows_missing_marker(self, tmp_path):
"""Tracks with missing files show [MISSING] in listing."""
bot = _FakeBot()
music_dir = tmp_path / "music"
music_dir.mkdir()
bot.state.set("music", "keep:1", json.dumps({
"title": "Gone Track", "artist": "", "duration": 0,
"filename": "gone.opus", "id": 1,
}))
with patch.object(_mod, "_MUSIC_DIR", music_dir):
msg = _Msg(text="!kept")
asyncio.run(_mod.cmd_kept(bot, msg))
assert any("MISSING" in r for r in bot.replied)
def test_kept_non_mumble(self):
bot = _FakeBot(mumble=False)
msg = _Msg(text="!kept")
asyncio.run(_mod.cmd_kept(bot, msg))
assert any("Mumble-only" in r for r in bot.replied)
# ---------------------------------------------------------------------------
# TestParseSeek
# ---------------------------------------------------------------------------
class TestParseSeek:
def test_absolute_seconds(self):
assert _mod._parse_seek("90") == ("abs", 90.0)
def test_absolute_mss(self):
assert _mod._parse_seek("1:30") == ("abs", 90.0)
def test_relative_forward(self):
assert _mod._parse_seek("+30") == ("rel", 30.0)
def test_relative_backward(self):
assert _mod._parse_seek("-30") == ("rel", -30.0)
def test_relative_mss(self):
assert _mod._parse_seek("+1:30") == ("rel", 90.0)
def test_relative_backward_mss(self):
assert _mod._parse_seek("-1:30") == ("rel", -90.0)
def test_invalid_raises(self):
import pytest
with pytest.raises(ValueError):
_mod._parse_seek("abc")
def test_empty_raises(self):
import pytest
with pytest.raises(ValueError):
_mod._parse_seek("")
# ---------------------------------------------------------------------------
# TestSeekCommand
# ---------------------------------------------------------------------------
class TestSeekCommand:
def test_seek_nothing_playing(self):
bot = _FakeBot()
msg = _Msg(text="!seek 1:30")
asyncio.run(_mod.cmd_seek(bot, msg))
assert any("Nothing playing" in r for r in bot.replied)
def test_seek_non_mumble(self):
bot = _FakeBot(mumble=False)
msg = _Msg(text="!seek 1:30")
asyncio.run(_mod.cmd_seek(bot, msg))
assert bot.replied == []
def test_seek_no_arg(self):
bot = _FakeBot()
msg = _Msg(text="!seek")
asyncio.run(_mod.cmd_seek(bot, msg))
assert any("Usage" in r for r in bot.replied)
def test_seek_invalid_arg(self):
bot = _FakeBot()
msg = _Msg(text="!seek xyz")
asyncio.run(_mod.cmd_seek(bot, msg))
assert any("Usage" in r for r in bot.replied)
def test_seek_absolute(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(url="x", title="Song", requester="a")
ps["seek_req"] = [None]
ps["progress"] = [100]
msg = _Msg(text="!seek 1:30")
asyncio.run(_mod.cmd_seek(bot, msg))
assert ps["seek_req"][0] == 90.0
assert ps["cur_seek"] == 90.0
assert ps["progress"][0] == 0
assert any("1:30" in r for r in bot.replied)
def test_seek_relative_forward(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(url="x", title="Song", requester="a")
ps["seek_req"] = [None]
ps["progress"] = [1500] # 1500 * 0.02 = 30s
ps["cur_seek"] = 60.0 # started at 60s
msg = _Msg(text="!seek +30")
asyncio.run(_mod.cmd_seek(bot, msg))
# elapsed = 60 + 30 = 90, target = 90 + 30 = 120
assert ps["seek_req"][0] == 120.0
assert ps["cur_seek"] == 120.0
assert ps["progress"][0] == 0
def test_seek_relative_backward_clamps(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(url="x", title="Song", requester="a")
ps["seek_req"] = [None]
ps["progress"] = [500] # 500 * 0.02 = 10s
ps["cur_seek"] = 0.0
msg = _Msg(text="!seek -30")
asyncio.run(_mod.cmd_seek(bot, msg))
# elapsed = 0 + 10 = 10, target = 10 - 30 = -20, clamped to 0
assert ps["seek_req"][0] == 0.0
assert ps["cur_seek"] == 0.0
assert ps["progress"][0] == 0
# ---------------------------------------------------------------------------
# TestVolumePersistence
# ---------------------------------------------------------------------------
class TestVolumePersistence:
def test_volume_persists_to_state(self):
bot = _FakeBot()
msg = _Msg(text="!volume 75")
asyncio.run(_mod.cmd_volume(bot, msg))
assert bot.state.get("music", "volume") == "75"
def test_volume_loads_on_connect(self):
bot = _FakeBot()
bot.state.set("music", "volume", "80")
spawned = []
def fake_spawn(coro, *, name=None):
task = MagicMock()
task.done.return_value = False
spawned.append(name)
coro.close()
return task
bot._spawn = fake_spawn
asyncio.run(_mod.on_connected(bot))
ps = _mod._ps(bot)
assert ps["volume"] == 80
def test_volume_loads_clamps_high(self):
bot = _FakeBot()
bot.state.set("music", "volume", "200")
spawned = []
def fake_spawn(coro, *, name=None):
task = MagicMock()
task.done.return_value = False
spawned.append(name)
coro.close()
return task
bot._spawn = fake_spawn
asyncio.run(_mod.on_connected(bot))
ps = _mod._ps(bot)
assert ps["volume"] == 100
def test_volume_loads_ignores_invalid(self):
bot = _FakeBot()
bot.state.set("music", "volume", "notanumber")
spawned = []
def fake_spawn(coro, *, name=None):
task = MagicMock()
task.done.return_value = False
spawned.append(name)
coro.close()
return task
bot._spawn = fake_spawn
asyncio.run(_mod.on_connected(bot))
ps = _mod._ps(bot)
assert ps["volume"] == 50 # default unchanged
# ---------------------------------------------------------------------------
# TestFadeAndCancel
# ---------------------------------------------------------------------------
class TestFadeAndCancel:
def test_sets_fade_state(self):
"""_fade_and_cancel sets fade_vol=0 and a fast fade_step."""
bot = _FakeBot()
ps = _mod._ps(bot)
# Create a fake task that stays "running"
mock_task = MagicMock()
mock_task.done.return_value = False
ps["task"] = mock_task
async def _check():
task = asyncio.create_task(_mod._fade_and_cancel(bot, duration=0.1))
# Let the fade start
await asyncio.sleep(0.02)
assert ps["fade_vol"] == 0
assert ps["fade_step"] is not None
assert ps["fade_step"] > 0.01
await task
asyncio.run(_check())
def test_noop_when_no_task(self):
"""_fade_and_cancel returns immediately if no task is running."""
bot = _FakeBot()
_mod._ps(bot)
asyncio.run(_mod._fade_and_cancel(bot, duration=0.1))
def test_clears_fade_step_after_cancel(self):
"""fade_step is reset to None after cancellation."""
bot = _FakeBot()
ps = _mod._ps(bot)
mock_task = MagicMock()
mock_task.done.return_value = False
ps["task"] = mock_task
asyncio.run(_mod._fade_and_cancel(bot, duration=0.1))
assert ps["fade_step"] is None
# ---------------------------------------------------------------------------
# TestPrevCommand
# ---------------------------------------------------------------------------
class TestPrevCommand:
def test_prev_no_history(self):
bot = _FakeBot()
msg = _Msg(text="!prev")
asyncio.run(_mod.cmd_prev(bot, msg))
assert any("No previous" in r for r in bot.replied)
def test_prev_pops_history(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(url="a", title="Current", requester="x")
ps["history"] = [
_mod._Track(url="b", title="Previous", requester="y"),
]
mock_task = MagicMock()
mock_task.done.return_value = False
ps["task"] = mock_task
msg = _Msg(text="!prev")
with patch.object(_mod, "_fade_and_cancel", new_callable=AsyncMock):
with patch.object(_mod, "_ensure_loop"):
asyncio.run(_mod.cmd_prev(bot, msg))
assert any("Previous" in r for r in bot.replied)
# History should be empty (popped the only entry)
assert len(ps["history"]) == 0
# Queue should have: prev track, then current track
assert len(ps["queue"]) == 2
assert ps["queue"][0].title == "Previous"
assert ps["queue"][1].title == "Current"
def test_prev_non_mumble(self):
bot = _FakeBot(mumble=False)
msg = _Msg(text="!prev")
asyncio.run(_mod.cmd_prev(bot, msg))
assert bot.replied == []
def test_prev_no_current_track(self):
"""!prev with history but nothing currently playing."""
bot = _FakeBot()
ps = _mod._ps(bot)
ps["history"] = [
_mod._Track(url="b", title="Previous", requester="y"),
]
msg = _Msg(text="!prev")
with patch.object(_mod, "_fade_and_cancel", new_callable=AsyncMock):
with patch.object(_mod, "_ensure_loop"):
asyncio.run(_mod.cmd_prev(bot, msg))
# Only the prev track in queue (no current to re-queue)
assert len(ps["queue"]) == 1
assert ps["queue"][0].title == "Previous"
# ---------------------------------------------------------------------------
# TestHistoryTracking
# ---------------------------------------------------------------------------
class TestHistoryTracking:
def test_skip_pushes_to_history(self):
"""Skipping a track adds it to history."""
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(url="a", title="First", requester="x")
mock_task = MagicMock()
mock_task.done.return_value = False
ps["task"] = mock_task
msg = _Msg(text="!skip")
with patch.object(_mod, "_fade_and_cancel", new_callable=AsyncMock):
asyncio.run(_mod.cmd_skip(bot, msg))
assert len(ps["history"]) == 1
assert ps["history"][0].title == "First"
def test_history_capped(self):
"""History does not exceed _MAX_HISTORY entries."""
bot = _FakeBot()
ps = _mod._ps(bot)
# Fill history to max
for i in range(_mod._MAX_HISTORY):
ps["history"].append(
_mod._Track(url=f"u{i}", title=f"T{i}", requester="a"),
)
assert len(ps["history"]) == _mod._MAX_HISTORY
# Skip another track, pushing to history
ps["current"] = _mod._Track(url="new", title="New", requester="x")
mock_task = MagicMock()
mock_task.done.return_value = False
ps["task"] = mock_task
msg = _Msg(text="!skip")
with patch.object(_mod, "_fade_and_cancel", new_callable=AsyncMock):
asyncio.run(_mod.cmd_skip(bot, msg))
assert len(ps["history"]) == _mod._MAX_HISTORY
assert ps["history"][-1].title == "New"
def test_ps_has_history(self):
"""_ps initializes with empty history list."""
bot = _FakeBot()
ps = _mod._ps(bot)
assert ps["history"] == []
# ---------------------------------------------------------------------------
# TestFadeState
# ---------------------------------------------------------------------------
class TestFadeState:
def test_ps_fade_fields_initialized(self):
"""_ps initializes fade_vol and fade_step to None."""
bot = _FakeBot()
ps = _mod._ps(bot)
assert ps["fade_vol"] is None
assert ps["fade_step"] is None
# ---------------------------------------------------------------------------
# TestKeepMetadata
# ---------------------------------------------------------------------------
class TestKeepMetadata:
def test_keep_stores_metadata(self, tmp_path):
"""!keep stores metadata JSON in bot.state keyed by ID."""
bot = _FakeBot()
ps = _mod._ps(bot)
f = tmp_path / "abc123.opus"
f.write_bytes(b"audio")
track = _mod._Track(
url="https://example.com/v", title="Test Song",
requester="a", local_path=f,
)
ps["current"] = track
msg = _Msg(text="!keep")
meta = {"title": "My Song", "artist": "Artist", "duration": 195.0}
music_dir = tmp_path / "kept"
music_dir.mkdir()
with patch.object(_mod, "_fetch_metadata", return_value=meta), \
patch.object(_mod, "_MUSIC_DIR", music_dir):
asyncio.run(_mod.cmd_keep(bot, msg))
assert track.keep is True
raw = bot.state.get("music", "keep:1")
assert raw is not None
stored = json.loads(raw)
assert stored["title"] == "My Song"
assert stored["artist"] == "Artist"
assert stored["duration"] == 195.0
assert stored["id"] == 1
assert any("My Song" in r for r in bot.replied)
assert any("Artist" in r for r in bot.replied)
assert any("3:15" in r for r in bot.replied)
def test_keep_no_artist(self, tmp_path):
"""!keep with empty artist omits the artist field."""
bot = _FakeBot()
ps = _mod._ps(bot)
f = tmp_path / "def456.opus"
f.write_bytes(b"audio")
track = _mod._Track(
url="https://example.com/v", title="Song",
requester="a", local_path=f,
)
ps["current"] = track
msg = _Msg(text="!keep")
meta = {"title": "Song", "artist": "NA", "duration": 60.0}
music_dir = tmp_path / "kept"
music_dir.mkdir()
with patch.object(_mod, "_fetch_metadata", return_value=meta), \
patch.object(_mod, "_MUSIC_DIR", music_dir):
asyncio.run(_mod.cmd_keep(bot, msg))
# Should not contain "NA" as artist
assert not any("NA" in r and "--" in r for r in bot.replied)
# ---------------------------------------------------------------------------
# TestKeptMetadata
# ---------------------------------------------------------------------------
class TestKeptMetadata:
def test_kept_shows_metadata(self, tmp_path):
"""!kept displays metadata from bot.state."""
bot = _FakeBot()
music_dir = tmp_path / "music"
music_dir.mkdir()
(music_dir / "abc123.opus").write_bytes(b"x" * 2048)
bot.state.set("music", "keep:1", json.dumps({
"title": "Cool Song", "artist": "DJ Test", "duration": 225.0,
"filename": "abc123.opus", "id": 1,
}))
with patch.object(_mod, "_MUSIC_DIR", music_dir):
msg = _Msg(text="!kept")
asyncio.run(_mod.cmd_kept(bot, msg))
assert any("Cool Song" in r for r in bot.replied)
assert any("DJ Test" in r for r in bot.replied)
assert any("3:45" in r for r in bot.replied)
assert any("#1" in r for r in bot.replied)
def test_kept_fallback_no_title(self):
"""!kept falls back to filename when no title in metadata."""
bot = _FakeBot()
bot.state.set("music", "keep:1", json.dumps({
"title": "", "artist": "", "duration": 0,
"filename": "xyz789.webm", "id": 1,
}))
msg = _Msg(text="!kept")
asyncio.run(_mod.cmd_kept(bot, msg))
assert any("xyz789.webm" in r for r in bot.replied)
def test_kept_clear_removes_metadata(self, tmp_path):
"""!kept clear also removes stored metadata and resets ID."""
bot = _FakeBot()
music_dir = tmp_path / "music"
music_dir.mkdir()
(music_dir / "abc123.opus").write_bytes(b"audio")
bot.state.set("music", "keep:1", json.dumps({
"title": "Song", "artist": "", "duration": 0, "id": 1,
}))
bot.state.set("music", "keep_next_id", "2")
with patch.object(_mod, "_MUSIC_DIR", music_dir):
msg = _Msg(text="!kept clear")
asyncio.run(_mod.cmd_kept(bot, msg))
assert bot.state.get("music", "keep:1") is None
assert bot.state.get("music", "keep_next_id") is None
assert any("Deleted 1 file(s)" in r for r in bot.replied)
# ---------------------------------------------------------------------------
# TestFetchMetadata
# ---------------------------------------------------------------------------
class TestFetchMetadata:
def test_success(self):
result = MagicMock()
result.stdout = "My Song\nArtist Name\n195.5\n"
with patch("subprocess.run", return_value=result):
meta = _mod._fetch_metadata("https://example.com/v")
assert meta["title"] == "My Song"
assert meta["artist"] == "Artist Name"
assert meta["duration"] == 195.5
def test_partial_output(self):
result = MagicMock()
result.stdout = "Only Title\n"
with patch("subprocess.run", return_value=result):
meta = _mod._fetch_metadata("https://example.com/v")
assert meta["title"] == "Only Title"
assert meta["artist"] == ""
assert meta["duration"] == 0
def test_error_returns_empty(self):
with patch("subprocess.run", side_effect=Exception("fail")):
meta = _mod._fetch_metadata("https://example.com/v")
assert meta["title"] == ""
assert meta["artist"] == ""
assert meta["duration"] == 0
# ---------------------------------------------------------------------------
# TestKeptRepair
# ---------------------------------------------------------------------------
class TestKeptRepair:
def test_repair_nothing_missing(self, tmp_path):
"""Repair reports all present when files exist."""
bot = _FakeBot()
music_dir = tmp_path / "music"
music_dir.mkdir()
(music_dir / "song.opus").write_bytes(b"audio")
bot.state.set("music", "keep:1", json.dumps({
"url": "https://example.com/v", "title": "Song",
"filename": "song.opus", "id": 1,
}))
with patch.object(_mod, "_MUSIC_DIR", music_dir):
msg = _Msg(text="!kept repair")
asyncio.run(_mod.cmd_kept(bot, msg))
assert any("nothing to repair" in r.lower() for r in bot.replied)
def test_repair_downloads_missing(self, tmp_path):
"""Repair re-downloads missing files."""
bot = _FakeBot()
music_dir = tmp_path / "music"
music_dir.mkdir()
bot.state.set("music", "keep:1", json.dumps({
"url": "https://example.com/v", "title": "Song",
"filename": "song.opus", "id": 1,
}))
dl_path = tmp_path / "cache" / "dl.opus"
dl_path.parent.mkdir()
dl_path.write_bytes(b"audio")
with patch.object(_mod, "_MUSIC_DIR", music_dir), \
patch.object(_mod, "_download_track", return_value=dl_path):
msg = _Msg(text="!kept repair")
asyncio.run(_mod.cmd_kept(bot, msg))
assert any("1 restored" in r for r in bot.replied)
assert (music_dir / "song.opus").is_file()
def test_repair_counts_failures(self, tmp_path):
"""Repair reports failed downloads."""
bot = _FakeBot()
music_dir = tmp_path / "music"
music_dir.mkdir()
bot.state.set("music", "keep:1", json.dumps({
"url": "https://example.com/v", "title": "Song",
"filename": "song.opus", "id": 1,
}))
with patch.object(_mod, "_MUSIC_DIR", music_dir), \
patch.object(_mod, "_download_track", return_value=None):
msg = _Msg(text="!kept repair")
asyncio.run(_mod.cmd_kept(bot, msg))
assert any("1 failed" in r for r in bot.replied)
def test_repair_no_url_skips(self, tmp_path):
"""Repair skips entries with no URL."""
bot = _FakeBot()
music_dir = tmp_path / "music"
music_dir.mkdir()
bot.state.set("music", "keep:1", json.dumps({
"url": "", "title": "No URL",
"filename": "nourl.opus", "id": 1,
}))
with patch.object(_mod, "_MUSIC_DIR", music_dir):
msg = _Msg(text="!kept repair")
asyncio.run(_mod.cmd_kept(bot, msg))
assert any("1 failed" in r for r in bot.replied)
def test_repair_extension_mismatch(self, tmp_path):
"""Repair updates metadata when download extension differs."""
bot = _FakeBot()
music_dir = tmp_path / "music"
music_dir.mkdir()
bot.state.set("music", "keep:1", json.dumps({
"url": "https://example.com/v", "title": "Song",
"filename": "song.opus", "id": 1,
}))
dl_path = tmp_path / "cache" / "dl.webm"
dl_path.parent.mkdir()
dl_path.write_bytes(b"audio")
with patch.object(_mod, "_MUSIC_DIR", music_dir), \
patch.object(_mod, "_download_track", return_value=dl_path):
msg = _Msg(text="!kept repair")
asyncio.run(_mod.cmd_kept(bot, msg))
assert any("1 restored" in r for r in bot.replied)
# Filename updated to new extension
raw = bot.state.get("music", "keep:1")
stored = json.loads(raw)
assert stored["filename"] == "song.webm"
assert (music_dir / "song.webm").is_file()