feat: expand YouTube playlists into individual queue tracks
_resolve_title replaced with _resolve_tracks using --flat-playlist to enumerate playlist entries. cmd_play enqueues each track individually, with truncation when the queue is nearly full. Single-video behavior unchanged.
This commit is contained in:
@@ -51,17 +51,33 @@ def _truncate(text: str, max_len: int = _MAX_TITLE_LEN) -> str:
|
||||
return text[: max_len - 3].rstrip() + "..."
|
||||
|
||||
|
||||
def _resolve_title(url: str) -> str:
|
||||
"""Resolve track title via yt-dlp. Blocking, run in executor."""
|
||||
def _resolve_tracks(url: str, max_tracks: int = _MAX_QUEUE) -> list[tuple[str, str]]:
|
||||
"""Resolve URL into (url, title) pairs via yt-dlp. Blocking, run in executor.
|
||||
|
||||
Handles both single videos and playlists. For playlists, returns up to
|
||||
``max_tracks`` individual entries. Falls back to ``[(url, url)]`` on error.
|
||||
"""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["yt-dlp", "--get-title", "--no-warnings", url],
|
||||
capture_output=True, text=True, timeout=15,
|
||||
[
|
||||
"yt-dlp", "--flat-playlist", "--print", "url",
|
||||
"--print", "title", "--no-warnings",
|
||||
f"--playlist-end={max_tracks}", url,
|
||||
],
|
||||
capture_output=True, text=True, timeout=30,
|
||||
)
|
||||
title = result.stdout.strip()
|
||||
return title if title else url
|
||||
lines = result.stdout.strip().splitlines()
|
||||
if len(lines) < 2:
|
||||
return [(url, url)]
|
||||
tracks = []
|
||||
for i in range(0, len(lines) - 1, 2):
|
||||
track_url = lines[i].strip()
|
||||
track_title = lines[i + 1].strip()
|
||||
if track_url:
|
||||
tracks.append((track_url, track_title or track_url))
|
||||
return tracks if tracks else [(url, url)]
|
||||
except Exception:
|
||||
return url
|
||||
return [(url, url)]
|
||||
|
||||
|
||||
# -- Play loop ---------------------------------------------------------------
|
||||
@@ -110,12 +126,15 @@ def _ensure_loop(bot) -> None:
|
||||
# -- Commands ----------------------------------------------------------------
|
||||
|
||||
|
||||
@command("play", help="Music: !play <url>")
|
||||
@command("play", help="Music: !play <url|playlist>")
|
||||
async def cmd_play(bot, message):
|
||||
"""Play a URL or add to queue if already playing.
|
||||
|
||||
Usage:
|
||||
!play <url> Play audio from URL (YouTube, SoundCloud, etc.)
|
||||
|
||||
Playlists are expanded into individual tracks. If the queue is nearly
|
||||
full, only as many tracks as will fit are enqueued.
|
||||
"""
|
||||
if not _is_mumble(bot):
|
||||
await bot.reply(message, "Music playback is Mumble-only")
|
||||
@@ -133,20 +152,36 @@ async def cmd_play(bot, message):
|
||||
await bot.reply(message, f"Queue full ({_MAX_QUEUE} tracks)")
|
||||
return
|
||||
|
||||
remaining = _MAX_QUEUE - len(ps["queue"])
|
||||
loop = asyncio.get_running_loop()
|
||||
title = await loop.run_in_executor(None, _resolve_title, url)
|
||||
track = _Track(url=url, title=title, requester=message.nick or "?")
|
||||
resolved = await loop.run_in_executor(None, _resolve_tracks, url, remaining)
|
||||
|
||||
ps["queue"].append(track)
|
||||
was_idle = ps["current"] is None
|
||||
requester = message.nick or "?"
|
||||
added = 0
|
||||
for track_url, track_title in resolved[:remaining]:
|
||||
ps["queue"].append(_Track(url=track_url, title=track_title,
|
||||
requester=requester))
|
||||
added += 1
|
||||
|
||||
if ps["current"] is not None:
|
||||
pos = len(ps["queue"])
|
||||
total_resolved = len(resolved)
|
||||
|
||||
if added == 1:
|
||||
title = _truncate(resolved[0][1])
|
||||
if was_idle:
|
||||
await bot.reply(message, f"Playing: {title}")
|
||||
else:
|
||||
pos = len(ps["queue"])
|
||||
await bot.reply(message, f"Queued #{pos}: {title}")
|
||||
elif added < total_resolved:
|
||||
await bot.reply(
|
||||
message,
|
||||
f"Queued #{pos}: {_truncate(title)}",
|
||||
f"Queued {added} of {total_resolved} tracks (queue full)",
|
||||
)
|
||||
else:
|
||||
await bot.reply(message, f"Playing: {_truncate(title)}")
|
||||
await bot.reply(message, f"Queued {added} tracks")
|
||||
|
||||
if was_idle:
|
||||
_ensure_loop(bot)
|
||||
|
||||
|
||||
|
||||
@@ -125,7 +125,8 @@ class TestPlayCommand:
|
||||
def test_play_queues_track(self):
|
||||
bot = _FakeBot()
|
||||
msg = _Msg(text="!play https://example.com/track")
|
||||
with patch.object(_mod, "_resolve_title", return_value="Test Track"):
|
||||
tracks = [("https://example.com/track", "Test Track")]
|
||||
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
|
||||
with patch.object(_mod, "_ensure_loop"):
|
||||
asyncio.run(_mod.cmd_play(bot, msg))
|
||||
assert any("Playing" in r for r in bot.replied)
|
||||
@@ -140,7 +141,8 @@ class TestPlayCommand:
|
||||
url="x", title="Current", requester="Bob",
|
||||
)
|
||||
msg = _Msg(text="!play https://example.com/next")
|
||||
with patch.object(_mod, "_resolve_title", return_value="Next Track"):
|
||||
tracks = [("https://example.com/next", "Next Track")]
|
||||
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
|
||||
asyncio.run(_mod.cmd_play(bot, msg))
|
||||
assert any("Queued" in r for r in bot.replied)
|
||||
|
||||
@@ -152,8 +154,7 @@ class TestPlayCommand:
|
||||
for _ in range(_mod._MAX_QUEUE)
|
||||
]
|
||||
msg = _Msg(text="!play https://example.com/overflow")
|
||||
with patch.object(_mod, "_resolve_title", return_value="Overflow"):
|
||||
asyncio.run(_mod.cmd_play(bot, msg))
|
||||
asyncio.run(_mod.cmd_play(bot, msg))
|
||||
assert any("full" in r.lower() for r in bot.replied)
|
||||
|
||||
|
||||
@@ -241,7 +242,8 @@ class TestQueueCommand:
|
||||
def test_queue_with_url_delegates(self):
|
||||
bot = _FakeBot()
|
||||
msg = _Msg(text="!queue https://example.com/track")
|
||||
with patch.object(_mod, "_resolve_title", return_value="Title"):
|
||||
tracks = [("https://example.com/track", "Title")]
|
||||
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
|
||||
with patch.object(_mod, "_ensure_loop"):
|
||||
asyncio.run(_mod.cmd_queue(bot, msg))
|
||||
# Should have called cmd_play logic
|
||||
@@ -351,3 +353,107 @@ class TestMusicHelpers:
|
||||
result = _mod._truncate(long)
|
||||
assert len(result) == 80
|
||||
assert result.endswith("...")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestPlaylistExpansion
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestPlaylistExpansion:
|
||||
def test_enqueue_multiple_tracks(self):
|
||||
bot = _FakeBot()
|
||||
msg = _Msg(text="!play https://example.com/playlist")
|
||||
tracks = [
|
||||
("https://example.com/1", "Track 1"),
|
||||
("https://example.com/2", "Track 2"),
|
||||
("https://example.com/3", "Track 3"),
|
||||
]
|
||||
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
|
||||
with patch.object(_mod, "_ensure_loop"):
|
||||
asyncio.run(_mod.cmd_play(bot, msg))
|
||||
ps = _mod._ps(bot)
|
||||
assert len(ps["queue"]) == 3
|
||||
assert any("Queued 3 tracks" in r for r in bot.replied)
|
||||
|
||||
def test_truncate_at_queue_limit(self):
|
||||
bot = _FakeBot()
|
||||
ps = _mod._ps(bot)
|
||||
ps["current"] = _mod._Track(url="x", title="Playing", requester="a")
|
||||
# Fill queue to 2 slots remaining
|
||||
ps["queue"] = [
|
||||
_mod._Track(url="x", title="t", requester="a")
|
||||
for _ in range(_mod._MAX_QUEUE - 2)
|
||||
]
|
||||
msg = _Msg(text="!play https://example.com/playlist")
|
||||
tracks = [
|
||||
("https://example.com/1", "Track 1"),
|
||||
("https://example.com/2", "Track 2"),
|
||||
("https://example.com/3", "Track 3"),
|
||||
("https://example.com/4", "Track 4"),
|
||||
]
|
||||
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
|
||||
asyncio.run(_mod.cmd_play(bot, msg))
|
||||
assert len(ps["queue"]) == _mod._MAX_QUEUE
|
||||
assert any("2 of 4" in r for r in bot.replied)
|
||||
|
||||
def test_start_loop_when_idle(self):
|
||||
bot = _FakeBot()
|
||||
msg = _Msg(text="!play https://example.com/playlist")
|
||||
tracks = [
|
||||
("https://example.com/1", "Track 1"),
|
||||
("https://example.com/2", "Track 2"),
|
||||
]
|
||||
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
|
||||
with patch.object(_mod, "_ensure_loop") as mock_loop:
|
||||
asyncio.run(_mod.cmd_play(bot, msg))
|
||||
mock_loop.assert_called_once()
|
||||
|
||||
def test_no_loop_start_when_busy(self):
|
||||
bot = _FakeBot()
|
||||
ps = _mod._ps(bot)
|
||||
ps["current"] = _mod._Track(url="x", title="Current", requester="a")
|
||||
msg = _Msg(text="!play https://example.com/playlist")
|
||||
tracks = [
|
||||
("https://example.com/1", "Track 1"),
|
||||
("https://example.com/2", "Track 2"),
|
||||
]
|
||||
with patch.object(_mod, "_resolve_tracks", return_value=tracks):
|
||||
with patch.object(_mod, "_ensure_loop") as mock_loop:
|
||||
asyncio.run(_mod.cmd_play(bot, msg))
|
||||
mock_loop.assert_not_called()
|
||||
|
||||
def test_resolve_tracks_single_video(self):
|
||||
"""Subprocess returning a single url+title pair."""
|
||||
result = MagicMock()
|
||||
result.stdout = "https://example.com/v1\nSingle Video\n"
|
||||
with patch("subprocess.run", return_value=result):
|
||||
tracks = _mod._resolve_tracks("https://example.com/v1")
|
||||
assert tracks == [("https://example.com/v1", "Single Video")]
|
||||
|
||||
def test_resolve_tracks_playlist(self):
|
||||
"""Subprocess returning multiple url+title pairs."""
|
||||
result = MagicMock()
|
||||
result.stdout = (
|
||||
"https://example.com/1\nFirst\n"
|
||||
"https://example.com/2\nSecond\n"
|
||||
)
|
||||
with patch("subprocess.run", return_value=result):
|
||||
tracks = _mod._resolve_tracks("https://example.com/pl")
|
||||
assert len(tracks) == 2
|
||||
assert tracks[0] == ("https://example.com/1", "First")
|
||||
assert tracks[1] == ("https://example.com/2", "Second")
|
||||
|
||||
def test_resolve_tracks_error_fallback(self):
|
||||
"""On error, returns [(url, url)]."""
|
||||
with patch("subprocess.run", side_effect=Exception("fail")):
|
||||
tracks = _mod._resolve_tracks("https://example.com/bad")
|
||||
assert tracks == [("https://example.com/bad", "https://example.com/bad")]
|
||||
|
||||
def test_resolve_tracks_empty_output(self):
|
||||
"""Empty stdout returns fallback."""
|
||||
result = MagicMock()
|
||||
result.stdout = ""
|
||||
with patch("subprocess.run", return_value=result):
|
||||
tracks = _mod._resolve_tracks("https://example.com/empty")
|
||||
assert tracks == [("https://example.com/empty", "https://example.com/empty")]
|
||||
|
||||
Reference in New Issue
Block a user