fix: kept file protection, skip/autoplay, TTS routing, video ID expansion
- _cleanup_track: never delete files from kept directory (data/music/) even when track.keep=False -- fixes kept files vanishing on replay - !kept rm: skip to next track if removing the currently playing one - !skip: silent (no reply), restarts play loop for autoplay on empty queue - TTS plays through merlin's own connection instead of derp's, preventing choppy audio when music and TTS compete for the same output buffer - !play recognizes bare YouTube video IDs (11-char alphanumeric) - !kept rm <id> subcommand for removing individual kept tracks Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -581,6 +581,7 @@ HTML stripped on receive, escaped on send. IRC-only commands are no-ops.
|
||||
!volume 75 # Set volume (0-100, default 50)
|
||||
!keep # Keep current file + save metadata
|
||||
!kept # List kept files with metadata
|
||||
!kept rm <id> # Remove a single kept track
|
||||
!kept clear # Delete all kept files + metadata
|
||||
!kept repair # Re-download missing kept files
|
||||
!duck # Show ducking status
|
||||
|
||||
+2
-1
@@ -1628,7 +1628,7 @@ and voice transmission.
|
||||
!np Now playing
|
||||
!volume [0-100] Get/set volume (persisted across restarts)
|
||||
!keep Keep current track's audio file (with metadata)
|
||||
!kept [clear|repair] List kept files, clear all, or re-download missing
|
||||
!kept [rm <id>|clear|repair] List, remove, clear, or repair kept files
|
||||
!testtone Play 3-second 440Hz test tone
|
||||
```
|
||||
|
||||
@@ -1750,6 +1750,7 @@ file (natural dedup).
|
||||
duration) is fetched via yt-dlp and stored in `bot.state`
|
||||
- Use `!kept` to list preserved files with metadata (title, artist, duration,
|
||||
file size)
|
||||
- Use `!kept rm <id>` to remove a single kept track (file + metadata)
|
||||
- Use `!kept clear` to delete all preserved files and their metadata
|
||||
- Use `!kept repair` to re-download any kept tracks whose local files are
|
||||
missing (e.g. after a cleanup or volume mount issue)
|
||||
|
||||
+53
-14
@@ -79,6 +79,16 @@ def _truncate(text: str, max_len: int = _MAX_TITLE_LEN) -> str:
|
||||
return text[: max_len - 3].rstrip() + "..."
|
||||
|
||||
|
||||
_YT_VIDEO_ID_RE = re.compile(r"^[A-Za-z0-9_-]{11}$")
|
||||
|
||||
|
||||
def _expand_video_id(text: str) -> str:
|
||||
"""Expand a bare YouTube video ID to a full URL."""
|
||||
if _YT_VIDEO_ID_RE.match(text):
|
||||
return f"https://www.youtube.com/watch?v={text}"
|
||||
return text
|
||||
|
||||
|
||||
def _is_url(text: str) -> bool:
|
||||
"""Check if text looks like a URL rather than a search query."""
|
||||
return text.startswith(("http://", "https://", "ytsearch:"))
|
||||
@@ -309,9 +319,13 @@ def _download_track(url: str, track_id: str, title: str = "") -> Path | None:
|
||||
|
||||
|
||||
def _cleanup_track(track: _Track) -> None:
|
||||
"""Delete the local audio file unless marked to keep."""
|
||||
"""Delete the local audio file unless marked to keep or in kept dir."""
|
||||
if track.local_path is None or track.keep:
|
||||
return
|
||||
# Never delete files from the kept directory -- they may have been
|
||||
# reused by _download_track for a non-kept playback of the same URL.
|
||||
if track.local_path.parent == _MUSIC_DIR:
|
||||
return
|
||||
try:
|
||||
track.local_path.unlink(missing_ok=True)
|
||||
log.info("music: deleted %s", track.local_path.name)
|
||||
@@ -844,6 +858,9 @@ async def cmd_play(bot, message):
|
||||
_ensure_loop(bot)
|
||||
return
|
||||
|
||||
# Expand bare YouTube video IDs (e.g. "U1yQMjFZ6j4")
|
||||
url = _expand_video_id(url)
|
||||
|
||||
# Strip #random fragment before URL classification / resolution
|
||||
shuffle = False
|
||||
if _is_url(url) and url.endswith("#random"):
|
||||
@@ -1074,14 +1091,7 @@ async def cmd_skip(bot, message):
|
||||
|
||||
await _fade_and_cancel(bot)
|
||||
|
||||
if ps["queue"]:
|
||||
_ensure_loop(bot)
|
||||
await bot.reply(
|
||||
message,
|
||||
f"Skipped: {_truncate(skipped.title)}",
|
||||
)
|
||||
else:
|
||||
await bot.reply(message, "Skipped, queue empty")
|
||||
_ensure_loop(bot)
|
||||
|
||||
|
||||
@command("prev", help="Music: !prev -- play previous track")
|
||||
@@ -1475,14 +1485,15 @@ async def cmd_keep(bot, message):
|
||||
await bot.reply(message, f"Keeping #{keep_id}: {label}")
|
||||
|
||||
|
||||
@command("kept", help="Music: !kept [clear|repair] -- list, clear, or repair kept files")
|
||||
@command("kept", help="Music: !kept [rm <id>|clear|repair] -- manage kept files")
|
||||
async def cmd_kept(bot, message):
|
||||
"""List, clear, or repair kept audio files in data/music/.
|
||||
"""List, clear, remove, or repair kept audio files in data/music/.
|
||||
|
||||
Usage:
|
||||
!kept List kept tracks with metadata and file status
|
||||
!kept clear Delete all kept files and metadata
|
||||
!kept repair Re-download kept tracks whose local files are missing
|
||||
!kept List kept tracks with metadata and file status
|
||||
!kept rm <id> Remove a single kept track by ID
|
||||
!kept clear Delete all kept files and metadata
|
||||
!kept repair Re-download kept tracks whose local files are missing
|
||||
"""
|
||||
if not _is_mumble(bot):
|
||||
await bot.reply(message, "Mumble-only feature")
|
||||
@@ -1491,6 +1502,34 @@ async def cmd_kept(bot, message):
|
||||
parts = message.text.split()
|
||||
sub = parts[1].lower() if len(parts) >= 2 else ""
|
||||
|
||||
if sub == "rm":
|
||||
if len(parts) < 3:
|
||||
await bot.reply(message, "Usage: !kept rm <id>")
|
||||
return
|
||||
kid = parts[2].lstrip("#")
|
||||
raw = bot.state.get("music", f"keep:{kid}")
|
||||
if not raw:
|
||||
await bot.reply(message, f"No kept track with ID #{kid}")
|
||||
return
|
||||
try:
|
||||
meta = json.loads(raw)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
meta = {}
|
||||
filename = meta.get("filename", "")
|
||||
if filename:
|
||||
fpath = _MUSIC_DIR / filename
|
||||
fpath.unlink(missing_ok=True)
|
||||
bot.state.delete("music", f"keep:{kid}")
|
||||
title = meta.get("title") or filename or kid
|
||||
await bot.reply(message, f"Removed #{kid}: {_truncate(title)}")
|
||||
# Skip if this track is currently playing
|
||||
ps = _ps(bot)
|
||||
cur = ps.get("current")
|
||||
if cur and filename and cur.local_path and cur.local_path.name == filename:
|
||||
await _fade_and_cancel(bot)
|
||||
_ensure_loop(bot)
|
||||
return
|
||||
|
||||
if sub == "clear":
|
||||
count = 0
|
||||
if _MUSIC_DIR.is_dir():
|
||||
|
||||
+2
-15
@@ -39,17 +39,6 @@ _WHISPER_URL = "http://192.168.129.9:8080/inference"
|
||||
_PIPER_URL = "http://192.168.129.9:5100/"
|
||||
|
||||
|
||||
def _find_voice_peer(bot):
|
||||
"""Find the voice-capable peer (the bot with 'voice' in only_plugins)."""
|
||||
bots = getattr(bot.registry, "_bots", {})
|
||||
for name, b in bots.items():
|
||||
if name == bot._username:
|
||||
continue
|
||||
if getattr(b, "_only_plugins", None) and "voice" in b._only_plugins:
|
||||
return b
|
||||
return None
|
||||
|
||||
|
||||
# -- Per-bot state -----------------------------------------------------------
|
||||
|
||||
|
||||
@@ -184,10 +173,8 @@ async def _flush_monitor(bot):
|
||||
remainder = text[len(trigger):].strip()
|
||||
if remainder:
|
||||
log.info("voice: trigger from %s: %s", name, remainder)
|
||||
# Route TTS through voice-capable peer if available
|
||||
speaker = _find_voice_peer(bot) or bot
|
||||
speaker._spawn(
|
||||
_tts_play(speaker, remainder), name="voice-tts",
|
||||
bot._spawn(
|
||||
_tts_play(bot, remainder), name="voice-tts",
|
||||
)
|
||||
continue
|
||||
|
||||
|
||||
+105
-4
@@ -126,6 +126,34 @@ class TestMumbleGuard:
|
||||
assert bot.replied == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestExpandVideoId
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestExpandVideoId:
|
||||
def test_bare_video_id(self):
|
||||
assert _mod._expand_video_id("U1yQMjFZ6j4") == \
|
||||
"https://www.youtube.com/watch?v=U1yQMjFZ6j4"
|
||||
|
||||
def test_id_with_hyphens_underscores(self):
|
||||
assert _mod._expand_video_id("dQw4w9WgXcQ") == \
|
||||
"https://www.youtube.com/watch?v=dQw4w9WgXcQ"
|
||||
|
||||
def test_too_short_not_expanded(self):
|
||||
assert _mod._expand_video_id("abc") == "abc"
|
||||
|
||||
def test_too_long_not_expanded(self):
|
||||
assert _mod._expand_video_id("abcdefghijkl") == "abcdefghijkl"
|
||||
|
||||
def test_full_url_not_expanded(self):
|
||||
url = "https://www.youtube.com/watch?v=U1yQMjFZ6j4"
|
||||
assert _mod._expand_video_id(url) == url
|
||||
|
||||
def test_search_query_not_expanded(self):
|
||||
assert _mod._expand_video_id("hello world") == "hello world"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestPlayCommand
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -235,10 +263,10 @@ class TestSkipCommand:
|
||||
msg = _Msg(text="!skip")
|
||||
with patch.object(_mod, "_ensure_loop"):
|
||||
asyncio.run(_mod.cmd_skip(bot, msg))
|
||||
assert any("Skipped" in r for r in bot.replied)
|
||||
assert not bot.replied # skip is silent when queue has next track
|
||||
mock_task.cancel.assert_called_once()
|
||||
|
||||
def test_skip_empty_queue(self):
|
||||
def test_skip_empty_queue_restarts_loop(self):
|
||||
bot = _FakeBot()
|
||||
ps = _mod._ps(bot)
|
||||
ps["current"] = _mod._Track(url="a", title="Only", requester="x")
|
||||
@@ -246,8 +274,9 @@ class TestSkipCommand:
|
||||
mock_task.done.return_value = False
|
||||
ps["task"] = mock_task
|
||||
msg = _Msg(text="!skip")
|
||||
asyncio.run(_mod.cmd_skip(bot, msg))
|
||||
assert any("empty" in r.lower() for r in bot.replied)
|
||||
with patch.object(_mod, "_ensure_loop") as mock_loop:
|
||||
asyncio.run(_mod.cmd_skip(bot, msg))
|
||||
mock_loop.assert_called_once() # loop restarted for autoplay
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -1507,6 +1536,20 @@ class TestCleanupTrack:
|
||||
track = _mod._Track(url="x", title="t", requester="a")
|
||||
_mod._cleanup_track(track) # should not raise
|
||||
|
||||
def test_cleanup_preserves_kept_dir_files(self, tmp_path):
|
||||
"""Cleanup never deletes files from the kept music directory."""
|
||||
music_dir = tmp_path / "music"
|
||||
music_dir.mkdir()
|
||||
f = music_dir / "kept-track.opus"
|
||||
f.write_bytes(b"audio")
|
||||
track = _mod._Track(
|
||||
url="x", title="t", requester="a",
|
||||
local_path=f, keep=False,
|
||||
)
|
||||
with patch.object(_mod, "_MUSIC_DIR", music_dir):
|
||||
_mod._cleanup_track(track)
|
||||
assert f.exists()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestKeepCommand
|
||||
@@ -1683,6 +1726,64 @@ class TestKeptCommand:
|
||||
asyncio.run(_mod.cmd_kept(bot, msg))
|
||||
assert any("MISSING" in r for r in bot.replied)
|
||||
|
||||
def test_kept_rm(self, tmp_path):
|
||||
bot = _FakeBot()
|
||||
music_dir = tmp_path / "music"
|
||||
music_dir.mkdir()
|
||||
(music_dir / "abc123.opus").write_bytes(b"audio")
|
||||
bot.state.set("music", "keep:1", json.dumps({
|
||||
"title": "Test Track", "filename": "abc123.opus", "id": 1,
|
||||
}))
|
||||
with patch.object(_mod, "_MUSIC_DIR", music_dir):
|
||||
msg = _Msg(text="!kept rm 1")
|
||||
asyncio.run(_mod.cmd_kept(bot, msg))
|
||||
assert any("Removed #1" in r for r in bot.replied)
|
||||
assert bot.state.get("music", "keep:1") is None
|
||||
assert not (music_dir / "abc123.opus").exists()
|
||||
|
||||
def test_kept_rm_with_hash(self, tmp_path):
|
||||
bot = _FakeBot()
|
||||
bot.state.set("music", "keep:3", json.dumps({
|
||||
"title": "Track 3", "filename": "t3.opus", "id": 3,
|
||||
}))
|
||||
with patch.object(_mod, "_MUSIC_DIR", tmp_path):
|
||||
msg = _Msg(text="!kept rm #3")
|
||||
asyncio.run(_mod.cmd_kept(bot, msg))
|
||||
assert any("Removed #3" in r for r in bot.replied)
|
||||
assert bot.state.get("music", "keep:3") is None
|
||||
|
||||
def test_kept_rm_skips_if_playing(self, tmp_path):
|
||||
bot = _FakeBot()
|
||||
ps = _mod._ps(bot)
|
||||
music_dir = tmp_path / "music"
|
||||
music_dir.mkdir()
|
||||
f = music_dir / "abc123.opus"
|
||||
f.write_bytes(b"audio")
|
||||
track = _mod._Track(url="x", title="t", requester="a",
|
||||
local_path=f, keep=True)
|
||||
ps["current"] = track
|
||||
bot.state.set("music", "keep:1", json.dumps({
|
||||
"title": "Test Track", "filename": "abc123.opus", "id": 1,
|
||||
}))
|
||||
with patch.object(_mod, "_MUSIC_DIR", music_dir), \
|
||||
patch.object(_mod, "_ensure_loop") as mock_loop:
|
||||
msg = _Msg(text="!kept rm 1")
|
||||
asyncio.run(_mod.cmd_kept(bot, msg))
|
||||
assert any("Removed #1" in r for r in bot.replied)
|
||||
mock_loop.assert_called_once() # restarts loop for autoplay
|
||||
|
||||
def test_kept_rm_missing_id(self):
|
||||
bot = _FakeBot()
|
||||
msg = _Msg(text="!kept rm")
|
||||
asyncio.run(_mod.cmd_kept(bot, msg))
|
||||
assert any("Usage" in r for r in bot.replied)
|
||||
|
||||
def test_kept_rm_not_found(self):
|
||||
bot = _FakeBot()
|
||||
msg = _Msg(text="!kept rm 99")
|
||||
asyncio.run(_mod.cmd_kept(bot, msg))
|
||||
assert any("No kept track" in r for r in bot.replied)
|
||||
|
||||
def test_kept_non_mumble(self):
|
||||
bot = _FakeBot(mumble=False)
|
||||
msg = _Msg(text="!kept")
|
||||
|
||||
Reference in New Issue
Block a user