Files
derp/tests/test_music.py
user c493583a71 feat: add !seek command and persist volume across restarts
Seek to absolute or relative positions mid-track via !seek. Supports
M:SS and plain seconds with +/- prefixes. Volume is now saved to
bot.state and restored on connect.
2026-02-22 03:31:35 +01:00

1362 lines
47 KiB
Python

"""Tests for the music playback plugin."""
import asyncio
import importlib.util
import sys
import time
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
# -- Load plugin module directly ---------------------------------------------
_spec = importlib.util.spec_from_file_location("music", "plugins/music.py")
_mod = importlib.util.module_from_spec(_spec)
sys.modules["music"] = _mod
_spec.loader.exec_module(_mod)
# -- Fakes -------------------------------------------------------------------
class _FakeState:
def __init__(self):
self._store: dict[str, dict[str, str]] = {}
def get(self, ns: str, key: str) -> str | None:
return self._store.get(ns, {}).get(key)
def set(self, ns: str, key: str, value: str) -> None:
self._store.setdefault(ns, {})[key] = value
def delete(self, ns: str, key: str) -> None:
self._store.get(ns, {}).pop(key, None)
def keys(self, ns: str) -> list[str]:
return list(self._store.get(ns, {}).keys())
class _FakeBot:
"""Minimal bot for music plugin testing."""
def __init__(self, *, mumble: bool = True):
self.sent: list[tuple[str, str]] = []
self.replied: list[str] = []
self.state = _FakeState()
self.config: dict = {}
self._pstate: dict = {}
self._tasks: set[asyncio.Task] = set()
if mumble:
self.stream_audio = AsyncMock()
async def send(self, target: str, text: str) -> None:
self.sent.append((target, text))
async def reply(self, message, text: str) -> None:
self.replied.append(text)
def _is_admin(self, message) -> bool:
return False
def _spawn(self, coro, *, name=None):
task = asyncio.ensure_future(coro)
self._tasks.add(task)
task.add_done_callback(self._tasks.discard)
return task
class _Msg:
"""Minimal message object."""
def __init__(self, text="!play url", nick="Alice", target="0",
is_channel=True):
self.text = text
self.nick = nick
self.target = target
self.is_channel = is_channel
self.prefix = nick
self.command = "PRIVMSG"
self.params = [target, text]
self.tags = {}
self.raw = {}
# ---------------------------------------------------------------------------
# TestMumbleGuard
# ---------------------------------------------------------------------------
class TestMumbleGuard:
def test_is_mumble_true(self):
bot = _FakeBot(mumble=True)
assert _mod._is_mumble(bot) is True
def test_is_mumble_false(self):
bot = _FakeBot(mumble=False)
assert _mod._is_mumble(bot) is False
def test_play_non_mumble(self):
bot = _FakeBot(mumble=False)
msg = _Msg(text="!play https://example.com")
asyncio.run(_mod.cmd_play(bot, msg))
assert any("Mumble-only" in r for r in bot.replied)
def test_stop_non_mumble_silent(self):
bot = _FakeBot(mumble=False)
msg = _Msg(text="!stop")
asyncio.run(_mod.cmd_stop(bot, msg))
assert bot.replied == []
def test_skip_non_mumble_silent(self):
bot = _FakeBot(mumble=False)
msg = _Msg(text="!skip")
asyncio.run(_mod.cmd_skip(bot, msg))
assert bot.replied == []
# ---------------------------------------------------------------------------
# TestPlayCommand
# ---------------------------------------------------------------------------
class TestPlayCommand:
def test_play_no_url(self):
bot = _FakeBot()
msg = _Msg(text="!play")
asyncio.run(_mod.cmd_play(bot, msg))
assert any("Usage" in r for r in bot.replied)
def test_play_queues_track(self):
bot = _FakeBot()
msg = _Msg(text="!play https://example.com/track")
tracks = [("https://example.com/track", "Test Track")]
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
with patch.object(_mod, "_ensure_loop"):
asyncio.run(_mod.cmd_play(bot, msg))
assert any("Playing" in r for r in bot.replied)
ps = _mod._ps(bot)
assert len(ps["queue"]) == 1
assert ps["queue"][0].title == "Test Track"
def test_play_search_query(self):
bot = _FakeBot()
msg = _Msg(text="!play classical music")
tracks = [
("https://youtube.com/watch?v=a", "Result 1"),
("https://youtube.com/watch?v=b", "Result 2"),
("https://youtube.com/watch?v=c", "Result 3"),
]
with patch.object(_mod, "_resolve_tracks", return_value=tracks) as mock_rt:
with patch.object(_mod, "_ensure_loop"):
asyncio.run(_mod.cmd_play(bot, msg))
# Should prepend ytsearch10: for non-URL input
mock_rt.assert_called_once()
assert mock_rt.call_args[0][0] == "ytsearch10:classical music"
# Should pick one random result, not enqueue all
ps = _mod._ps(bot)
assert len(ps["queue"]) == 1
assert any("Playing" in r for r in bot.replied)
def test_play_shows_queued_when_busy(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(
url="x", title="Current", requester="Bob",
)
msg = _Msg(text="!play https://example.com/next")
tracks = [("https://example.com/next", "Next Track")]
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
asyncio.run(_mod.cmd_play(bot, msg))
assert any("Queued" in r for r in bot.replied)
def test_play_queue_full(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["queue"] = [
_mod._Track(url="x", title="t", requester="a")
for _ in range(_mod._MAX_QUEUE)
]
msg = _Msg(text="!play https://example.com/overflow")
asyncio.run(_mod.cmd_play(bot, msg))
assert any("full" in r.lower() for r in bot.replied)
# ---------------------------------------------------------------------------
# TestStopCommand
# ---------------------------------------------------------------------------
class TestStopCommand:
def test_stop_clears_queue(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["queue"] = [_mod._Track(url="x", title="t", requester="a")]
ps["current"] = _mod._Track(url="y", title="s", requester="b")
msg = _Msg(text="!stop")
asyncio.run(_mod.cmd_stop(bot, msg))
assert ps["queue"] == []
assert ps["current"] is None
assert any("Stopped" in r for r in bot.replied)
# ---------------------------------------------------------------------------
# TestSkipCommand
# ---------------------------------------------------------------------------
class TestSkipCommand:
def test_skip_nothing_playing(self):
bot = _FakeBot()
msg = _Msg(text="!skip")
asyncio.run(_mod.cmd_skip(bot, msg))
assert any("Nothing" in r for r in bot.replied)
def test_skip_with_queue(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(url="a", title="First", requester="x")
ps["queue"] = [_mod._Track(url="b", title="Second", requester="y")]
# We need to mock the task
mock_task = MagicMock()
mock_task.done.return_value = False
ps["task"] = mock_task
msg = _Msg(text="!skip")
with patch.object(_mod, "_ensure_loop"):
asyncio.run(_mod.cmd_skip(bot, msg))
assert any("Skipped" in r for r in bot.replied)
mock_task.cancel.assert_called_once()
def test_skip_empty_queue(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(url="a", title="Only", requester="x")
mock_task = MagicMock()
mock_task.done.return_value = False
ps["task"] = mock_task
msg = _Msg(text="!skip")
asyncio.run(_mod.cmd_skip(bot, msg))
assert any("empty" in r.lower() for r in bot.replied)
# ---------------------------------------------------------------------------
# TestQueueCommand
# ---------------------------------------------------------------------------
class TestQueueCommand:
def test_queue_empty(self):
bot = _FakeBot()
msg = _Msg(text="!queue")
asyncio.run(_mod.cmd_queue(bot, msg))
assert any("empty" in r.lower() for r in bot.replied)
def test_queue_with_tracks(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(url="a", title="Now", requester="x")
ps["queue"] = [
_mod._Track(url="b", title="Next", requester="y"),
]
msg = _Msg(text="!queue")
asyncio.run(_mod.cmd_queue(bot, msg))
assert any("Now" in r for r in bot.replied)
assert any("Next" in r for r in bot.replied)
def test_queue_with_url_delegates(self):
bot = _FakeBot()
msg = _Msg(text="!queue https://example.com/track")
tracks = [("https://example.com/track", "Title")]
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
with patch.object(_mod, "_ensure_loop"):
asyncio.run(_mod.cmd_queue(bot, msg))
# Should have called cmd_play logic
assert any("Playing" in r or "Queued" in r for r in bot.replied)
# ---------------------------------------------------------------------------
# TestNpCommand
# ---------------------------------------------------------------------------
class TestNpCommand:
def test_np_nothing(self):
bot = _FakeBot()
msg = _Msg(text="!np")
asyncio.run(_mod.cmd_np(bot, msg))
assert any("Nothing" in r for r in bot.replied)
def test_np_playing(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(
url="x", title="Cool Song", requester="DJ",
)
msg = _Msg(text="!np")
asyncio.run(_mod.cmd_np(bot, msg))
assert any("Cool Song" in r for r in bot.replied)
assert any("DJ" in r for r in bot.replied)
# ---------------------------------------------------------------------------
# TestVolumeCommand
# ---------------------------------------------------------------------------
class TestVolumeCommand:
def test_volume_show(self):
bot = _FakeBot()
msg = _Msg(text="!volume")
asyncio.run(_mod.cmd_volume(bot, msg))
assert any("50%" in r for r in bot.replied)
def test_volume_set(self):
bot = _FakeBot()
msg = _Msg(text="!volume 75")
asyncio.run(_mod.cmd_volume(bot, msg))
ps = _mod._ps(bot)
assert ps["volume"] == 75
assert any("75%" in r for r in bot.replied)
def test_volume_out_of_range(self):
bot = _FakeBot()
msg = _Msg(text="!volume 150")
asyncio.run(_mod.cmd_volume(bot, msg))
assert any("0-100" in r for r in bot.replied)
def test_volume_negative_absolute(self):
"""Bare negative that underflows clamps at 0-100 error."""
bot = _FakeBot()
_mod._ps(bot)["volume"] = 5
msg = _Msg(text="!volume -10")
asyncio.run(_mod.cmd_volume(bot, msg))
assert any("0-100" in r for r in bot.replied)
def test_volume_relative_up(self):
bot = _FakeBot()
msg = _Msg(text="!volume +15")
asyncio.run(_mod.cmd_volume(bot, msg))
ps = _mod._ps(bot)
assert ps["volume"] == 65
assert any("65%" in r for r in bot.replied)
def test_volume_relative_down(self):
bot = _FakeBot()
_mod._ps(bot)["volume"] = 80
msg = _Msg(text="!volume -20")
asyncio.run(_mod.cmd_volume(bot, msg))
ps = _mod._ps(bot)
assert ps["volume"] == 60
assert any("60%" in r for r in bot.replied)
def test_volume_relative_clamp_over(self):
bot = _FakeBot()
_mod._ps(bot)["volume"] = 95
msg = _Msg(text="!volume +10")
asyncio.run(_mod.cmd_volume(bot, msg))
assert any("0-100" in r for r in bot.replied)
def test_volume_relative_clamp_under(self):
bot = _FakeBot()
_mod._ps(bot)["volume"] = 5
msg = _Msg(text="!volume -10")
asyncio.run(_mod.cmd_volume(bot, msg))
assert any("0-100" in r for r in bot.replied)
def test_volume_invalid(self):
bot = _FakeBot()
msg = _Msg(text="!volume abc")
asyncio.run(_mod.cmd_volume(bot, msg))
assert any("Usage" in r for r in bot.replied)
# ---------------------------------------------------------------------------
# TestPerBotState
# ---------------------------------------------------------------------------
class TestPerBotState:
def test_ps_initializes(self):
bot = _FakeBot()
ps = _mod._ps(bot)
assert ps["queue"] == []
assert ps["current"] is None
assert ps["volume"] == 50
def test_ps_stable_reference(self):
bot = _FakeBot()
ps1 = _mod._ps(bot)
ps2 = _mod._ps(bot)
assert ps1 is ps2
def test_ps_isolated_per_bot(self):
bot1 = _FakeBot()
bot2 = _FakeBot()
_mod._ps(bot1)["volume"] = 80
assert _mod._ps(bot2)["volume"] == 50
# ---------------------------------------------------------------------------
# TestHelpers
# ---------------------------------------------------------------------------
class TestMusicHelpers:
def test_truncate_short(self):
assert _mod._truncate("short") == "short"
def test_truncate_long(self):
long = "x" * 100
result = _mod._truncate(long)
assert len(result) == 80
assert result.endswith("...")
def test_is_url_http(self):
assert _mod._is_url("https://youtube.com/watch?v=abc") is True
def test_is_url_plain_http(self):
assert _mod._is_url("http://example.com") is True
def test_is_url_ytsearch(self):
assert _mod._is_url("ytsearch:classical music") is True
def test_is_url_search_query(self):
assert _mod._is_url("classical music") is False
def test_is_url_single_word(self):
assert _mod._is_url("jazz") is False
# ---------------------------------------------------------------------------
# TestPlaylistExpansion
# ---------------------------------------------------------------------------
class TestPlaylistExpansion:
def test_enqueue_multiple_tracks(self):
bot = _FakeBot()
msg = _Msg(text="!play https://example.com/playlist")
tracks = [
("https://example.com/1", "Track 1"),
("https://example.com/2", "Track 2"),
("https://example.com/3", "Track 3"),
]
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
with patch.object(_mod, "_ensure_loop"):
asyncio.run(_mod.cmd_play(bot, msg))
ps = _mod._ps(bot)
assert len(ps["queue"]) == 3
assert any("Queued 3 tracks" in r for r in bot.replied)
def test_truncate_at_queue_limit(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(url="x", title="Playing", requester="a")
# Fill queue to 2 slots remaining
ps["queue"] = [
_mod._Track(url="x", title="t", requester="a")
for _ in range(_mod._MAX_QUEUE - 2)
]
msg = _Msg(text="!play https://example.com/playlist")
tracks = [
("https://example.com/1", "Track 1"),
("https://example.com/2", "Track 2"),
("https://example.com/3", "Track 3"),
("https://example.com/4", "Track 4"),
]
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
asyncio.run(_mod.cmd_play(bot, msg))
assert len(ps["queue"]) == _mod._MAX_QUEUE
assert any("2 of 4" in r for r in bot.replied)
def test_start_loop_when_idle(self):
bot = _FakeBot()
msg = _Msg(text="!play https://example.com/playlist")
tracks = [
("https://example.com/1", "Track 1"),
("https://example.com/2", "Track 2"),
]
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
with patch.object(_mod, "_ensure_loop") as mock_loop:
asyncio.run(_mod.cmd_play(bot, msg))
mock_loop.assert_called_once()
def test_no_loop_start_when_busy(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(url="x", title="Current", requester="a")
msg = _Msg(text="!play https://example.com/playlist")
tracks = [
("https://example.com/1", "Track 1"),
("https://example.com/2", "Track 2"),
]
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
with patch.object(_mod, "_ensure_loop") as mock_loop:
asyncio.run(_mod.cmd_play(bot, msg))
mock_loop.assert_not_called()
def test_resolve_tracks_single_video(self):
"""Subprocess returning a single url+title pair."""
result = MagicMock()
result.stdout = "https://example.com/v1\nSingle Video\n"
with patch("subprocess.run", return_value=result):
tracks = _mod._resolve_tracks("https://example.com/v1")
assert tracks == [("https://example.com/v1", "Single Video")]
def test_resolve_tracks_na_url_fallback(self):
"""--flat-playlist prints NA for single videos; use original URL."""
result = MagicMock()
result.stdout = "NA\nSingle Video\n"
with patch("subprocess.run", return_value=result):
tracks = _mod._resolve_tracks("https://example.com/v1")
assert tracks == [("https://example.com/v1", "Single Video")]
def test_resolve_tracks_playlist(self):
"""Subprocess returning multiple url+title pairs."""
result = MagicMock()
result.stdout = (
"https://example.com/1\nFirst\n"
"https://example.com/2\nSecond\n"
)
with patch("subprocess.run", return_value=result):
tracks = _mod._resolve_tracks("https://example.com/pl")
assert len(tracks) == 2
assert tracks[0] == ("https://example.com/1", "First")
assert tracks[1] == ("https://example.com/2", "Second")
def test_resolve_tracks_error_fallback(self):
"""On error, returns [(url, url)]."""
with patch("subprocess.run", side_effect=Exception("fail")):
tracks = _mod._resolve_tracks("https://example.com/bad")
assert tracks == [("https://example.com/bad", "https://example.com/bad")]
def test_resolve_tracks_empty_output(self):
"""Empty stdout returns fallback."""
result = MagicMock()
result.stdout = ""
with patch("subprocess.run", return_value=result):
tracks = _mod._resolve_tracks("https://example.com/empty")
assert tracks == [("https://example.com/empty", "https://example.com/empty")]
# ---------------------------------------------------------------------------
# TestResumeState
# ---------------------------------------------------------------------------
class TestResumeState:
def test_save_load_roundtrip(self):
bot = _FakeBot()
track = _mod._Track(url="https://example.com/a", title="Song", requester="Alice")
_mod._save_resume(bot, track, 125.5)
data = _mod._load_resume(bot)
assert data is not None
assert data["url"] == "https://example.com/a"
assert data["title"] == "Song"
assert data["requester"] == "Alice"
assert data["elapsed"] == 125.5
def test_clear_removes_state(self):
bot = _FakeBot()
track = _mod._Track(url="https://example.com/a", title="Song", requester="Alice")
_mod._save_resume(bot, track, 60.0)
_mod._clear_resume(bot)
assert _mod._load_resume(bot) is None
def test_load_returns_none_when_empty(self):
bot = _FakeBot()
assert _mod._load_resume(bot) is None
def test_load_returns_none_on_corrupt_json(self):
bot = _FakeBot()
bot.state.set("music", "resume", "not-json{{{")
assert _mod._load_resume(bot) is None
def test_load_returns_none_on_missing_url(self):
bot = _FakeBot()
bot.state.set("music", "resume", '{"title": "x"}')
assert _mod._load_resume(bot) is None
# ---------------------------------------------------------------------------
# TestResumeCommand
# ---------------------------------------------------------------------------
class TestResumeCommand:
def test_nothing_saved(self):
bot = _FakeBot()
msg = _Msg(text="!resume")
asyncio.run(_mod.cmd_resume(bot, msg))
assert any("Nothing to resume" in r for r in bot.replied)
def test_already_playing(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(url="x", title="Playing", requester="a")
msg = _Msg(text="!resume")
asyncio.run(_mod.cmd_resume(bot, msg))
assert any("Already playing" in r for r in bot.replied)
def test_non_mumble(self):
bot = _FakeBot(mumble=False)
msg = _Msg(text="!resume")
asyncio.run(_mod.cmd_resume(bot, msg))
assert any("Mumble-only" in r for r in bot.replied)
def test_loads_track_and_seeks(self):
bot = _FakeBot()
track = _mod._Track(url="https://example.com/a", title="Song", requester="Alice")
_mod._save_resume(bot, track, 225.0)
msg = _Msg(text="!resume")
with patch.object(_mod, "_ensure_loop") as mock_loop:
asyncio.run(_mod.cmd_resume(bot, msg))
mock_loop.assert_called_once_with(bot, seek=225.0)
ps = _mod._ps(bot)
assert len(ps["queue"]) == 1
assert ps["queue"][0].url == "https://example.com/a"
assert any("Resuming" in r for r in bot.replied)
def test_time_format_in_reply(self):
bot = _FakeBot()
track = _mod._Track(url="https://example.com/a", title="Song", requester="Alice")
_mod._save_resume(bot, track, 225.0)
msg = _Msg(text="!resume")
with patch.object(_mod, "_ensure_loop"):
asyncio.run(_mod.cmd_resume(bot, msg))
assert any("3:45" in r for r in bot.replied)
def test_clears_resume_state_after_loading(self):
bot = _FakeBot()
track = _mod._Track(url="https://example.com/a", title="Song", requester="Alice")
_mod._save_resume(bot, track, 60.0)
msg = _Msg(text="!resume")
with patch.object(_mod, "_ensure_loop"):
asyncio.run(_mod.cmd_resume(bot, msg))
assert _mod._load_resume(bot) is None
# ---------------------------------------------------------------------------
# TestFmtTime
# ---------------------------------------------------------------------------
class TestFmtTime:
def test_zero(self):
assert _mod._fmt_time(0) == "0:00"
def test_seconds_only(self):
assert _mod._fmt_time(45) == "0:45"
def test_minutes_and_seconds(self):
assert _mod._fmt_time(225) == "3:45"
def test_large_value(self):
assert _mod._fmt_time(3661) == "61:01"
# ---------------------------------------------------------------------------
# TestDuckCommand
# ---------------------------------------------------------------------------
class TestDuckCommand:
def test_show_status(self):
bot = _FakeBot()
msg = _Msg(text="!duck")
asyncio.run(_mod.cmd_duck(bot, msg))
assert any("Duck:" in r for r in bot.replied)
assert any("floor=1%" in r for r in bot.replied)
assert any("restore=30s" in r for r in bot.replied)
def test_toggle_on(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["duck_enabled"] = False
msg = _Msg(text="!duck on")
asyncio.run(_mod.cmd_duck(bot, msg))
assert ps["duck_enabled"] is True
assert any("enabled" in r for r in bot.replied)
def test_toggle_off(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["duck_enabled"] = True
ps["duck_vol"] = 5.0
msg = _Msg(text="!duck off")
asyncio.run(_mod.cmd_duck(bot, msg))
assert ps["duck_enabled"] is False
assert ps["duck_vol"] is None
assert any("disabled" in r for r in bot.replied)
def test_set_floor(self):
bot = _FakeBot()
msg = _Msg(text="!duck floor 10")
asyncio.run(_mod.cmd_duck(bot, msg))
ps = _mod._ps(bot)
assert ps["duck_floor"] == 10
assert any("10%" in r for r in bot.replied)
def test_set_floor_invalid(self):
bot = _FakeBot()
msg = _Msg(text="!duck floor 200")
asyncio.run(_mod.cmd_duck(bot, msg))
assert any("0-100" in r for r in bot.replied)
def test_set_silence(self):
bot = _FakeBot()
msg = _Msg(text="!duck silence 30")
asyncio.run(_mod.cmd_duck(bot, msg))
ps = _mod._ps(bot)
assert ps["duck_silence"] == 30
assert any("30s" in r for r in bot.replied)
def test_set_restore(self):
bot = _FakeBot()
msg = _Msg(text="!duck restore 45")
asyncio.run(_mod.cmd_duck(bot, msg))
ps = _mod._ps(bot)
assert ps["duck_restore"] == 45
assert any("45s" in r for r in bot.replied)
def test_non_mumble(self):
bot = _FakeBot(mumble=False)
msg = _Msg(text="!duck")
asyncio.run(_mod.cmd_duck(bot, msg))
assert any("Mumble-only" in r for r in bot.replied)
# ---------------------------------------------------------------------------
# TestDuckMonitor
# ---------------------------------------------------------------------------
class TestDuckMonitor:
def test_voice_detected_ducks_to_floor(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["duck_enabled"] = True
ps["duck_floor"] = 5
bot._last_voice_ts = time.monotonic()
async def _check():
task = asyncio.create_task(_mod._duck_monitor(bot))
await asyncio.sleep(1.5)
assert ps["duck_vol"] == 5.0
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_check())
def test_silence_begins_smooth_restore(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["duck_enabled"] = True
ps["duck_floor"] = 1
ps["duck_restore"] = 10 # 10s total restore
ps["volume"] = 50
bot._last_voice_ts = time.monotonic() - 100
ps["duck_vol"] = 1.0 # already ducked
async def _check():
task = asyncio.create_task(_mod._duck_monitor(bot))
await asyncio.sleep(1.5)
# After ~1s into a 10s ramp from 1->50, vol should be ~5-6
vol = ps["duck_vol"]
assert vol is not None and vol > 1.0
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_check())
def test_full_restore_sets_none(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["duck_enabled"] = True
ps["duck_floor"] = 1
ps["duck_restore"] = 1 # 1s restore -- completes quickly
ps["volume"] = 50
bot._last_voice_ts = time.monotonic() - 100
ps["duck_vol"] = 1.0
async def _check():
task = asyncio.create_task(_mod._duck_monitor(bot))
# First tick starts restore, second tick sees elapsed >= dur
await asyncio.sleep(2.5)
assert ps["duck_vol"] is None
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_check())
def test_reduck_during_restore(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["duck_enabled"] = True
ps["duck_floor"] = 5
ps["duck_restore"] = 30
ps["volume"] = 50
bot._last_voice_ts = time.monotonic() - 100
ps["duck_vol"] = 30.0 # mid-restore
async def _check():
task = asyncio.create_task(_mod._duck_monitor(bot))
await asyncio.sleep(0.5)
# Simulate voice arriving now
bot._last_voice_ts = time.monotonic()
await asyncio.sleep(1.5)
assert ps["duck_vol"] == 5.0 # re-ducked to floor
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_check())
def test_disabled_no_ducking(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["duck_enabled"] = False
bot._last_voice_ts = time.monotonic()
async def _check():
task = asyncio.create_task(_mod._duck_monitor(bot))
await asyncio.sleep(1.5)
assert ps["duck_vol"] is None
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_check())
# ---------------------------------------------------------------------------
# TestAutoResume
# ---------------------------------------------------------------------------
class TestAutoResume:
def test_resume_on_silence(self):
"""Auto-resume loads saved state when channel is silent."""
bot = _FakeBot()
bot._connect_count = 2
bot._last_voice_ts = 0.0
track = _mod._Track(url="https://example.com/a", title="Song", requester="Alice")
_mod._save_resume(bot, track, 120.0)
with patch.object(_mod, "_ensure_loop") as mock_loop:
asyncio.run(_mod._auto_resume(bot))
mock_loop.assert_called_once_with(bot, seek=120.0)
ps = _mod._ps(bot)
assert len(ps["queue"]) == 1
assert ps["queue"][0].url == "https://example.com/a"
# Resume state cleared after loading
assert _mod._load_resume(bot) is None
def test_no_resume_if_playing(self):
"""Auto-resume returns early when already playing."""
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(url="x", title="Playing", requester="a")
track = _mod._Track(url="https://example.com/a", title="Song", requester="Alice")
_mod._save_resume(bot, track, 60.0)
with patch.object(_mod, "_ensure_loop") as mock_loop:
asyncio.run(_mod._auto_resume(bot))
mock_loop.assert_not_called()
def test_no_resume_if_no_state(self):
"""Auto-resume returns early when nothing is saved."""
bot = _FakeBot()
bot._last_voice_ts = 0.0
with patch.object(_mod, "_ensure_loop") as mock_loop:
asyncio.run(_mod._auto_resume(bot))
mock_loop.assert_not_called()
def test_abort_if_voice_active(self):
"""Auto-resume aborts if voice never goes silent within deadline."""
bot = _FakeBot()
now = time.monotonic()
bot._last_voice_ts = now
ps = _mod._ps(bot)
ps["duck_silence"] = 15
track = _mod._Track(url="https://example.com/a", title="Song", requester="Alice")
_mod._save_resume(bot, track, 60.0)
async def _check():
# Patch monotonic to jump past the 60s deadline; keep voice active
mono_val = [now]
_real_sleep = asyncio.sleep
def _fast_mono():
return mono_val[0]
async def _fast_sleep(s):
mono_val[0] += s
bot._last_voice_ts = mono_val[0]
await _real_sleep(0)
with patch.object(time, "monotonic", side_effect=_fast_mono):
with patch("asyncio.sleep", side_effect=_fast_sleep):
with patch.object(_mod, "_ensure_loop") as mock_loop:
await _mod._auto_resume(bot)
mock_loop.assert_not_called()
asyncio.run(_check())
def test_reconnect_watcher_triggers_resume(self):
"""Watcher detects connect_count increment and calls _auto_resume."""
bot = _FakeBot()
bot._connect_count = 1
async def _check():
with patch.object(_mod, "_auto_resume", new_callable=AsyncMock) as mock_ar:
task = asyncio.create_task(_mod._reconnect_watcher(bot))
await asyncio.sleep(0.5)
# Simulate reconnection
bot._connect_count = 2
await asyncio.sleep(3)
mock_ar.assert_called_once_with(bot)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_check())
def test_watcher_ignores_first_connect(self):
"""Watcher does not trigger on initial connection (count 0->1) without saved state."""
bot = _FakeBot()
bot._connect_count = 0
async def _check():
with patch.object(_mod, "_auto_resume", new_callable=AsyncMock) as mock_ar:
task = asyncio.create_task(_mod._reconnect_watcher(bot))
await asyncio.sleep(0.5)
bot._connect_count = 1
await asyncio.sleep(3)
mock_ar.assert_not_called()
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_check())
def test_watcher_boot_resume_with_saved_state(self):
"""Watcher triggers boot-resume on first connect when state exists."""
bot = _FakeBot()
bot._connect_count = 0
track = _mod._Track(url="https://example.com/a", title="Song", requester="Alice")
_mod._save_resume(bot, track, 30.0)
async def _check():
with patch.object(_mod, "_auto_resume", new_callable=AsyncMock) as mock_ar:
task = asyncio.create_task(_mod._reconnect_watcher(bot))
await asyncio.sleep(0.5)
bot._connect_count = 1
await asyncio.sleep(3)
mock_ar.assert_called_once_with(bot)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(_check())
def test_on_connected_starts_watcher(self):
"""on_connected() starts the reconnect watcher task."""
bot = _FakeBot()
spawned = []
def fake_spawn(coro, *, name=None):
task = MagicMock()
task.done.return_value = False
spawned.append(name)
# Close the coroutine to avoid RuntimeWarning
coro.close()
return task
bot._spawn = fake_spawn
asyncio.run(_mod.on_connected(bot))
assert "music-reconnect-watcher" in spawned
ps = _mod._ps(bot)
assert ps["_watcher_task"] is not None
def test_on_connected_no_double_start(self):
"""on_connected() does not start a second watcher."""
bot = _FakeBot()
spawned = []
def fake_spawn(coro, *, name=None):
task = MagicMock()
task.done.return_value = False
spawned.append(name)
coro.close()
return task
bot._spawn = fake_spawn
asyncio.run(_mod.on_connected(bot))
asyncio.run(_mod.on_connected(bot))
assert spawned.count("music-reconnect-watcher") == 1
# ---------------------------------------------------------------------------
# TestDownloadTrack
# ---------------------------------------------------------------------------
class TestDownloadTrack:
def test_download_success(self, tmp_path):
"""Successful download returns a Path."""
music_dir = tmp_path / "music"
with patch.object(_mod, "_MUSIC_DIR", music_dir):
result = MagicMock()
result.stdout = str(music_dir / "abc123.opus") + "\n"
result.returncode = 0
# Create the file so is_file() returns True
music_dir.mkdir(parents=True)
(music_dir / "abc123.opus").write_bytes(b"audio")
with patch("subprocess.run", return_value=result):
path = _mod._download_track("https://example.com/v", "abc123")
assert path is not None
assert path.name == "abc123.opus"
def test_download_fallback_glob(self, tmp_path):
"""Falls back to glob when --print output is empty."""
music_dir = tmp_path / "music"
with patch.object(_mod, "_MUSIC_DIR", music_dir):
result = MagicMock()
result.stdout = ""
result.returncode = 0
music_dir.mkdir(parents=True)
(music_dir / "abc123.webm").write_bytes(b"audio")
with patch("subprocess.run", return_value=result):
path = _mod._download_track("https://example.com/v", "abc123")
assert path is not None
assert path.name == "abc123.webm"
def test_download_failure_returns_none(self, tmp_path):
"""Exception during download returns None."""
music_dir = tmp_path / "music"
with patch.object(_mod, "_MUSIC_DIR", music_dir):
with patch("subprocess.run", side_effect=Exception("fail")):
path = _mod._download_track("https://example.com/v", "abc123")
assert path is None
def test_download_no_file_returns_none(self, tmp_path):
"""No matching file on disk returns None."""
music_dir = tmp_path / "music"
with patch.object(_mod, "_MUSIC_DIR", music_dir):
result = MagicMock()
result.stdout = "/nonexistent/path.opus\n"
result.returncode = 0
music_dir.mkdir(parents=True)
with patch("subprocess.run", return_value=result):
path = _mod._download_track("https://example.com/v", "abc123")
assert path is None
# ---------------------------------------------------------------------------
# TestCleanupTrack
# ---------------------------------------------------------------------------
class TestCleanupTrack:
def test_cleanup_deletes_file(self, tmp_path):
"""Cleanup deletes the local file when keep=False."""
f = tmp_path / "test.opus"
f.write_bytes(b"audio")
track = _mod._Track(
url="x", title="t", requester="a",
local_path=f, keep=False,
)
_mod._cleanup_track(track)
assert not f.exists()
def test_cleanup_keeps_file_when_flagged(self, tmp_path):
"""Cleanup preserves the file when keep=True."""
f = tmp_path / "test.opus"
f.write_bytes(b"audio")
track = _mod._Track(
url="x", title="t", requester="a",
local_path=f, keep=True,
)
_mod._cleanup_track(track)
assert f.exists()
def test_cleanup_noop_when_no_path(self):
"""Cleanup does nothing when local_path is None."""
track = _mod._Track(url="x", title="t", requester="a")
_mod._cleanup_track(track) # should not raise
# ---------------------------------------------------------------------------
# TestKeepCommand
# ---------------------------------------------------------------------------
class TestKeepCommand:
def test_keep_nothing_playing(self):
bot = _FakeBot()
msg = _Msg(text="!keep")
asyncio.run(_mod.cmd_keep(bot, msg))
assert any("Nothing playing" in r for r in bot.replied)
def test_keep_no_local_file(self):
bot = _FakeBot()
ps = _mod._ps(bot)
ps["current"] = _mod._Track(url="x", title="t", requester="a")
msg = _Msg(text="!keep")
asyncio.run(_mod.cmd_keep(bot, msg))
assert any("No local file" in r for r in bot.replied)
def test_keep_marks_track(self, tmp_path):
bot = _FakeBot()
ps = _mod._ps(bot)
f = tmp_path / "abc123.opus"
f.write_bytes(b"audio")
track = _mod._Track(
url="x", title="t", requester="a", local_path=f,
)
ps["current"] = track
msg = _Msg(text="!keep")
asyncio.run(_mod.cmd_keep(bot, msg))
assert track.keep is True
assert any("Keeping" in r for r in bot.replied)
def test_keep_non_mumble(self):
bot = _FakeBot(mumble=False)
msg = _Msg(text="!keep")
asyncio.run(_mod.cmd_keep(bot, msg))
assert any("Mumble-only" in r for r in bot.replied)
# ---------------------------------------------------------------------------
# TestKeptCommand
# ---------------------------------------------------------------------------
class TestKeptCommand:
def test_kept_empty(self, tmp_path):
bot = _FakeBot()
with patch.object(_mod, "_MUSIC_DIR", tmp_path / "empty"):
msg = _Msg(text="!kept")
asyncio.run(_mod.cmd_kept(bot, msg))
assert any("No kept files" in r for r in bot.replied)
def test_kept_lists_files(self, tmp_path):
bot = _FakeBot()
music_dir = tmp_path / "music"
music_dir.mkdir()
(music_dir / "abc123.opus").write_bytes(b"x" * 1024)
with patch.object(_mod, "_MUSIC_DIR", music_dir):
msg = _Msg(text="!kept")
asyncio.run(_mod.cmd_kept(bot, msg))
assert any("Kept files" in r for r in bot.replied)
assert any("abc123.opus" in r for r in bot.replied)
def test_kept_clear(self, tmp_path):
bot = _FakeBot()
music_dir = tmp_path / "music"
music_dir.mkdir()
(music_dir / "abc123.opus").write_bytes(b"audio")
(music_dir / "def456.webm").write_bytes(b"audio")
with patch.object(_mod, "_MUSIC_DIR", music_dir):
msg = _Msg(text="!kept clear")
asyncio.run(_mod.cmd_kept(bot, msg))
assert any("Deleted 2 file(s)" in r for r in bot.replied)
assert not list(music_dir.iterdir())
def test_kept_non_mumble(self):
bot = _FakeBot(mumble=False)
msg = _Msg(text="!kept")
asyncio.run(_mod.cmd_kept(bot, msg))
assert any("Mumble-only" in r for r in bot.replied)
# ---------------------------------------------------------------------------
# TestParseSeek
# ---------------------------------------------------------------------------
class TestParseSeek:
def test_absolute_seconds(self):
assert _mod._parse_seek("90") == ("abs", 90.0)
def test_absolute_mss(self):
assert _mod._parse_seek("1:30") == ("abs", 90.0)
def test_relative_forward(self):
assert _mod._parse_seek("+30") == ("rel", 30.0)
def test_relative_backward(self):
assert _mod._parse_seek("-30") == ("rel", -30.0)
def test_relative_mss(self):
assert _mod._parse_seek("+1:30") == ("rel", 90.0)
def test_relative_backward_mss(self):
assert _mod._parse_seek("-1:30") == ("rel", -90.0)
def test_invalid_raises(self):
import pytest
with pytest.raises(ValueError):
_mod._parse_seek("abc")
def test_empty_raises(self):
import pytest
with pytest.raises(ValueError):
_mod._parse_seek("")
# ---------------------------------------------------------------------------
# TestSeekCommand
# ---------------------------------------------------------------------------
class TestSeekCommand:
def test_seek_nothing_playing(self):
bot = _FakeBot()
msg = _Msg(text="!seek 1:30")
asyncio.run(_mod.cmd_seek(bot, msg))
assert any("Nothing playing" in r for r in bot.replied)
def test_seek_non_mumble(self):
bot = _FakeBot(mumble=False)
msg = _Msg(text="!seek 1:30")
asyncio.run(_mod.cmd_seek(bot, msg))
assert bot.replied == []
def test_seek_no_arg(self):
bot = _FakeBot()
msg = _Msg(text="!seek")
asyncio.run(_mod.cmd_seek(bot, msg))
assert any("Usage" in r for r in bot.replied)
def test_seek_invalid_arg(self):
bot = _FakeBot()
msg = _Msg(text="!seek xyz")
asyncio.run(_mod.cmd_seek(bot, msg))
assert any("Usage" in r for r in bot.replied)
def test_seek_absolute(self):
bot = _FakeBot()
ps = _mod._ps(bot)
track = _mod._Track(url="x", title="Song", requester="a")
ps["current"] = track
mock_task = MagicMock()
mock_task.done.return_value = False
ps["task"] = mock_task
msg = _Msg(text="!seek 1:30")
with patch.object(_mod, "_ensure_loop") as mock_loop:
asyncio.run(_mod.cmd_seek(bot, msg))
mock_loop.assert_called_once_with(bot, seek=90.0)
assert ps["queue"][0] is track
assert any("1:30" in r for r in bot.replied)
mock_task.cancel.assert_called_once()
def test_seek_relative_forward(self):
bot = _FakeBot()
ps = _mod._ps(bot)
track = _mod._Track(url="x", title="Song", requester="a")
ps["current"] = track
ps["progress"] = [1500] # 1500 * 0.02 = 30s
ps["cur_seek"] = 60.0 # started at 60s
mock_task = MagicMock()
mock_task.done.return_value = False
ps["task"] = mock_task
msg = _Msg(text="!seek +30")
with patch.object(_mod, "_ensure_loop") as mock_loop:
asyncio.run(_mod.cmd_seek(bot, msg))
# elapsed = 60 + 30 = 90, target = 90 + 30 = 120
mock_loop.assert_called_once_with(bot, seek=120.0)
def test_seek_relative_backward_clamps(self):
bot = _FakeBot()
ps = _mod._ps(bot)
track = _mod._Track(url="x", title="Song", requester="a")
ps["current"] = track
ps["progress"] = [500] # 500 * 0.02 = 10s
ps["cur_seek"] = 0.0
mock_task = MagicMock()
mock_task.done.return_value = False
ps["task"] = mock_task
msg = _Msg(text="!seek -30")
with patch.object(_mod, "_ensure_loop") as mock_loop:
asyncio.run(_mod.cmd_seek(bot, msg))
# elapsed = 0 + 10 = 10, target = 10 - 30 = -20, clamped to 0
mock_loop.assert_called_once_with(bot, seek=0.0)
# ---------------------------------------------------------------------------
# TestVolumePersistence
# ---------------------------------------------------------------------------
class TestVolumePersistence:
def test_volume_persists_to_state(self):
bot = _FakeBot()
msg = _Msg(text="!volume 75")
asyncio.run(_mod.cmd_volume(bot, msg))
assert bot.state.get("music", "volume") == "75"
def test_volume_loads_on_connect(self):
bot = _FakeBot()
bot.state.set("music", "volume", "80")
spawned = []
def fake_spawn(coro, *, name=None):
task = MagicMock()
task.done.return_value = False
spawned.append(name)
coro.close()
return task
bot._spawn = fake_spawn
asyncio.run(_mod.on_connected(bot))
ps = _mod._ps(bot)
assert ps["volume"] == 80
def test_volume_loads_clamps_high(self):
bot = _FakeBot()
bot.state.set("music", "volume", "200")
spawned = []
def fake_spawn(coro, *, name=None):
task = MagicMock()
task.done.return_value = False
spawned.append(name)
coro.close()
return task
bot._spawn = fake_spawn
asyncio.run(_mod.on_connected(bot))
ps = _mod._ps(bot)
assert ps["volume"] == 100
def test_volume_loads_ignores_invalid(self):
bot = _FakeBot()
bot.state.set("music", "volume", "notanumber")
spawned = []
def fake_spawn(coro, *, name=None):
task = MagicMock()
task.done.return_value = False
spawned.append(name)
coro.close()
return task
bot._spawn = fake_spawn
asyncio.run(_mod.on_connected(bot))
ps = _mod._ps(bot)
assert ps["volume"] == 50 # default unchanged