"""Plugin: music playback for Mumble voice channels.""" from __future__ import annotations import asyncio import hashlib import json import logging import random import re import shutil import subprocess import time from dataclasses import dataclass from pathlib import Path from urllib.parse import parse_qs, urlencode, urlparse, urlunparse from derp.plugin import command log = logging.getLogger(__name__) _MAX_QUEUE = 50 _MAX_TITLE_LEN = 80 @dataclass(slots=True) class _Track: url: str title: str requester: str origin: str = "" # original user-provided URL for re-resolution local_path: Path | None = None # set before playback keep: bool = False # True = don't delete after playback # -- Per-bot runtime state --------------------------------------------------- def _ps(bot): """Per-bot plugin runtime state.""" cfg = getattr(bot, "config", {}).get("music", {}) return bot._pstate.setdefault("music", { "queue": [], "current": None, "volume": 50, "task": None, "done_event": None, "duck_enabled": cfg.get("duck_enabled", True), "duck_floor": cfg.get("duck_floor", 1), "duck_silence": cfg.get("duck_silence", 15), "duck_restore": cfg.get("duck_restore", 30), "duck_vol": None, "duck_task": None, "fade_vol": None, "fade_step": None, "history": [], "autoplay": cfg.get("autoplay", True), "_watcher_task": None, }) # -- Helpers ----------------------------------------------------------------- def _is_mumble(bot) -> bool: """Check if bot supports voice streaming.""" return hasattr(bot, "stream_audio") def _truncate(text: str, max_len: int = _MAX_TITLE_LEN) -> str: """Truncate text with ellipsis if needed.""" if len(text) <= max_len: return text return text[: max_len - 3].rstrip() + "..." def _is_url(text: str) -> bool: """Check if text looks like a URL rather than a search query.""" return text.startswith(("http://", "https://", "ytsearch:")) def _fmt_time(seconds: float) -> str: """Format seconds as M:SS.""" m, s = divmod(int(seconds), 60) return f"{m}:{s:02d}" def _parse_seek(arg: str) -> tuple[str, float]: """Parse a seek offset string into (mode, seconds). Returns ``("abs", seconds)`` for absolute seeks (``1:30``, ``90``) or ``("rel", +/-seconds)`` for relative (``+30``, ``-1:00``). Raises ``ValueError`` on invalid input. """ if not arg: raise ValueError("empty seek argument") mode = "abs" raw = arg if raw[0] in ("+", "-"): mode = "rel" sign = -1 if raw[0] == "-" else 1 raw = raw[1:] else: sign = 1 if ":" in raw: parts = raw.split(":", 1) try: minutes = int(parts[0]) seconds = int(parts[1]) except ValueError: raise ValueError(f"invalid seek format: {arg}") total = minutes * 60 + seconds else: try: total = int(raw) except ValueError: raise ValueError(f"invalid seek format: {arg}") return (mode, sign * float(total)) # -- Resume state persistence ------------------------------------------------ def _strip_playlist_params(url: str) -> str: """Strip playlist context params from a YouTube URL. Keeps only the video identifier so resume/download targets the exact video instead of resolving through a radio mix or playlist. """ parsed = urlparse(url) if "youtube.com" not in parsed.netloc and "youtu.be" not in parsed.netloc: return url params = parse_qs(parsed.query, keep_blank_values=True) # Keep only the video ID; drop list, index, start_radio, pp, etc. clean = {k: v for k, v in params.items() if k == "v"} if not clean: return url return urlunparse(parsed._replace(query=urlencode(clean, doseq=True))) def _save_resume(bot, track: _Track, elapsed: float) -> None: """Persist current track and elapsed position for later resumption.""" data = json.dumps({ "url": _strip_playlist_params(track.url), "title": track.title, "requester": track.requester, "elapsed": round(elapsed, 2), }) bot.state.set("music", "resume", data) def _load_resume(bot) -> dict | None: """Load resume data, or None if absent/corrupt.""" raw = bot.state.get("music", "resume") if not raw: return None try: data = json.loads(raw) if not isinstance(data, dict) or "url" not in data: return None return data except (json.JSONDecodeError, TypeError): return None def _clear_resume(bot) -> None: """Remove persisted resume state.""" bot.state.delete("music", "resume") def _resolve_tracks(url: str, max_tracks: int = _MAX_QUEUE) -> list[tuple[str, str]]: """Resolve URL into (url, title) pairs via yt-dlp. Blocking, run in executor. Handles both single videos and playlists. For playlists, returns up to ``max_tracks`` individual entries. Falls back to ``[(url, url)]`` on error. YouTube URLs with ``&list=`` are passed through intact so yt-dlp can resolve the full playlist. Playlist params are only stripped in ``_save_resume()`` where we need the exact video for resume. """ try: result = subprocess.run( [ "yt-dlp", "--flat-playlist", "--print", "url", "--print", "title", "--no-warnings", f"--playlist-end={max_tracks}", url, ], capture_output=True, text=True, timeout=30, ) lines = result.stdout.strip().splitlines() if len(lines) < 2: return [(url, url)] tracks = [] for i in range(0, len(lines) - 1, 2): track_url = lines[i].strip() track_title = lines[i + 1].strip() # --flat-playlist prints "NA" for single videos (no extraction) if not track_url or track_url == "NA": track_url = url tracks.append((track_url, track_title or track_url)) return tracks if tracks else [(url, url)] except Exception: return [(url, url)] # -- Download helpers -------------------------------------------------------- _MUSIC_DIR = Path("data/music") # kept tracks (persistent) _CACHE_DIR = Path("data/music/cache") # temporary playback downloads def _fetch_metadata(url: str) -> dict: """Fetch track metadata via yt-dlp. Blocking -- run in executor.""" try: result = subprocess.run( ["yt-dlp", "--print", "title", "--print", "artist", "--print", "duration", "--no-warnings", "--no-download", "--no-playlist", url], capture_output=True, text=True, timeout=15, ) lines = result.stdout.strip().splitlines() return { "title": lines[0] if len(lines) > 0 else "", "artist": lines[1] if len(lines) > 1 else "", "duration": (float(lines[2]) if len(lines) > 2 and lines[2].replace(".", "", 1).isdigit() else 0), } except Exception: log.warning("music: metadata fetch failed for %s", url) return {"title": "", "artist": "", "duration": 0} def _sanitize_filename(title: str, fallback: str) -> str: """Convert a track title to a clean, filesystem-safe filename. Keeps alphanumeric chars, hyphens, and underscores. Collapses whitespace/separators into single hyphens. Falls back to the URL-based hash if the title produces nothing usable. """ name = re.sub(r"[^\w\s-]", "", title.lower()) name = re.sub(r"[\s_-]+", "-", name).strip("-") if not name: return fallback return name[:80] def _download_track(url: str, track_id: str, title: str = "") -> Path | None: """Download audio to cache dir. Blocking -- run in executor. Checks the kept directory first (reuse kept files). New downloads go to the cache dir and are cleaned up after playback unless kept. """ filename = _sanitize_filename(title, track_id) if title else track_id _MIN_CACHE_SIZE = 100 * 1024 # 100 KB -- skip partial downloads # Reuse existing kept or cached file for d in (_MUSIC_DIR, _CACHE_DIR): for name in (filename, track_id): existing = list(d.glob(f"{name}.*")) if d.is_dir() else [] for f in existing: # Trust kept files; skip suspiciously small cache files if d == _MUSIC_DIR or f.stat().st_size >= _MIN_CACHE_SIZE: return f _CACHE_DIR.mkdir(parents=True, exist_ok=True) template = str(_CACHE_DIR / f"{track_id}.%(ext)s") try: result = subprocess.run( ["yt-dlp", "-f", "bestaudio", "-x", "-c", "--no-overwrites", "--no-playlist", "--no-warnings", "-o", template, "--print", "after_move:filepath", url], capture_output=True, text=True, timeout=300, ) filepath = result.stdout.strip().splitlines()[-1] if result.stdout.strip() else "" if filepath and Path(filepath).is_file(): return Path(filepath) matches = list(_CACHE_DIR.glob(f"{track_id}.*")) return matches[0] if matches else None except Exception: log.exception("download failed for %s", url) return None def _cleanup_track(track: _Track) -> None: """Delete the local audio file unless marked to keep.""" if track.local_path is None or track.keep: return try: track.local_path.unlink(missing_ok=True) log.info("music: deleted %s", track.local_path.name) except OSError: log.warning("music: failed to delete %s", track.local_path) # -- Duck monitor ------------------------------------------------------------ async def _duck_monitor(bot) -> None: """Background task: duck volume when voice is detected, restore on silence. Ducking is immediate (snap to floor). Restoration is a single smooth linear ramp from floor to user volume over ``duck_restore`` seconds. The per-frame volume ramp in ``stream_audio`` further smooths each 1-second update, eliminating audible steps. """ ps = _ps(bot) restore_start: float = 0.0 # monotonic ts when restore began restore_from: float = 0.0 # duck_vol at restore start try: while True: await asyncio.sleep(1) if not ps["duck_enabled"]: if ps["duck_vol"] is not None: ps["duck_vol"] = None restore_start = 0.0 continue ts = getattr(bot.registry, "_voice_ts", 0.0) if ts == 0.0: continue silence = time.monotonic() - ts if silence < ps["duck_silence"]: # Voice active -- duck immediately if ps["duck_vol"] is None: log.info("duck: voice detected, ducking to %d%%", ps["duck_floor"]) ps["duck_vol"] = float(ps["duck_floor"]) restore_start = 0.0 elif ps["duck_vol"] is not None: # Silence exceeded -- smooth linear restore if restore_start == 0.0: restore_start = time.monotonic() restore_from = ps["duck_vol"] log.info("duck: restoring %d%% -> %d%% over %ds", int(restore_from), ps["volume"], ps["duck_restore"]) elapsed = time.monotonic() - restore_start dur = ps["duck_restore"] if dur <= 0 or elapsed >= dur: ps["duck_vol"] = None restore_start = 0.0 else: target = ps["volume"] ps["duck_vol"] = restore_from + (target - restore_from) * (elapsed / dur) except asyncio.CancelledError: ps["duck_vol"] = None # -- Auto-resume on reconnect ------------------------------------------------ async def _auto_resume(bot) -> None: """Wait for silence after reconnect, then resume saved playback.""" ps = _ps(bot) if ps["current"] is not None: return data = _load_resume(bot) if data is None: return elapsed = data.get("elapsed", 0.0) title = _truncate(data.get("title", data["url"])) pos = _fmt_time(elapsed) # Let pymumble fully stabilize after reconnect await asyncio.sleep(10) deadline = time.monotonic() + 60 silence_needed = ps.get("duck_silence", 15) ts = getattr(bot.registry, "_voice_ts", 0.0) if ts != 0.0 and time.monotonic() - ts < silence_needed: await bot.send("0", f"Resuming '{title}' at {pos} once silent for " f"{silence_needed}s") else: await bot.send("0", f"Resuming '{title}' at {pos} in a moment") while time.monotonic() < deadline: await asyncio.sleep(2) ts = getattr(bot.registry, "_voice_ts", 0.0) if ts == 0.0: break if time.monotonic() - ts >= silence_needed: break else: log.info("music: auto-resume aborted, channel not silent after 60s") await bot.send("0", f"Resume of '{title}' aborted -- " "channel not silent") return # Re-check after waiting -- someone may have started playback manually if ps["current"] is not None: return data = _load_resume(bot) if data is None: return elapsed = data.get("elapsed", 0.0) track = _Track( url=data["url"], title=data.get("title", data["url"]), requester=data.get("requester", "?"), ) ps["queue"].insert(0, track) _clear_resume(bot) log.info("music: auto-resuming '%s' from %s", track.title, _fmt_time(elapsed)) _ensure_loop(bot, seek=elapsed) def _load_kept_tracks(bot) -> list[_Track]: """Load all kept tracks from state with valid local files.""" tracks = [] for key in bot.state.keys("music"): if not key.startswith("keep:"): continue raw = bot.state.get("music", key) if not raw: continue try: meta = json.loads(raw) except (json.JSONDecodeError, TypeError): continue filename = meta.get("filename", "") if not filename: continue fpath = _MUSIC_DIR / filename if not fpath.is_file(): continue tracks.append(_Track( url=meta.get("url", str(fpath)), title=meta.get("title") or filename, requester="autoplay", local_path=fpath, keep=True, )) return tracks async def _autoplay_kept(bot) -> None: """Shuffle kept tracks and start playback when idle after reconnect.""" ps = _ps(bot) if ps["current"] is not None: return kept = _load_kept_tracks(bot) if not kept: return # Let pymumble fully stabilize await asyncio.sleep(10) # Wait for silence deadline = time.monotonic() + 60 silence_needed = ps.get("duck_silence", 15) ts = getattr(bot.registry, "_voice_ts", 0.0) if ts != 0.0 and time.monotonic() - ts < silence_needed: await bot.send("0", f"Shuffling {len(kept)} kept tracks once silent") while time.monotonic() < deadline: await asyncio.sleep(2) ts = getattr(bot.registry, "_voice_ts", 0.0) if ts == 0.0: break if time.monotonic() - ts >= silence_needed: break else: log.info("music: autoplay aborted, channel not silent after 60s") return if ps["current"] is not None: return random.shuffle(kept) ps["queue"].extend(kept) log.info("music: autoplay %d kept tracks (shuffled)", len(kept)) _ensure_loop(bot) async def _reconnect_watcher(bot) -> None: """Poll for reconnections and trigger auto-resume. Also handles cold-start resume: if saved state exists on first run, waits for the connection to stabilize then resumes. """ last_seen = getattr(bot, "_connect_count", 0) boot_checked = False while True: await asyncio.sleep(2) count = getattr(bot, "_connect_count", 0) # Cold-start: resume or autoplay after first connection if not boot_checked and count >= 1: boot_checked = True if _load_resume(bot) is not None: log.info("music: saved state found on boot, attempting auto-resume") await _auto_resume(bot) elif _ps(bot).get("autoplay", True): await _autoplay_kept(bot) continue if count > last_seen and count > 1: last_seen = count if _load_resume(bot) is not None: log.info("music: reconnection detected, attempting auto-resume") await _auto_resume(bot) elif _ps(bot).get("autoplay", True): await _autoplay_kept(bot) last_seen = max(last_seen, count) # -- Play loop --------------------------------------------------------------- async def _play_loop(bot, *, seek: float = 0.0, fade_in: float | bool = True) -> None: """Pop tracks from queue and stream them sequentially.""" ps = _ps(bot) duck_task = bot._spawn(_duck_monitor(bot), name="music-duck-monitor") ps["duck_task"] = duck_task first = True seek_req = [None] ps["seek_req"] = seek_req try: while ps["queue"]: track = ps["queue"].pop(0) ps["current"] = track ps["fade_vol"] = None ps["fade_step"] = None done = asyncio.Event() ps["done_event"] = done cur_seek = seek if first else 0.0 if not first: fade_in = True # always fade in after first track first = False progress = [0] ps["progress"] = progress ps["cur_seek"] = cur_seek # Download phase source = track.url if track.local_path is None: loop = asyncio.get_running_loop() tid = hashlib.md5(track.url.encode()).hexdigest()[:12] dl_path = await loop.run_in_executor( None, _download_track, track.url, tid, track.title, ) if dl_path: track.local_path = dl_path source = str(dl_path) else: log.warning("music: download failed, streaming %s", track.url) else: source = str(track.local_path) try: await bot.stream_audio( source, volume=lambda: ( ps["fade_vol"] if ps["fade_vol"] is not None else ps["duck_vol"] if ps["duck_vol"] is not None else ps["volume"] ) / 100.0, on_done=done, seek=cur_seek, progress=progress, fade_step=lambda: ps.get("fade_step"), fade_in=fade_in, seek_req=seek_req, ) except asyncio.CancelledError: elapsed = cur_seek + progress[0] * 0.02 if elapsed > 1.0: _save_resume(bot, track, elapsed) raise except Exception: log.exception("music: stream error for %s", track.url) elapsed = cur_seek + progress[0] * 0.02 if elapsed > 1.0: _save_resume(bot, track, elapsed) break await done.wait() if progress[0] > 0: _clear_resume(bot) # Push finished track to history ps["history"].append(_Track(url=track.url, title=track.title, requester=track.requester, origin=track.origin)) if len(ps["history"]) > _MAX_HISTORY: ps["history"].pop(0) _cleanup_track(track) except asyncio.CancelledError: pass finally: # Clean up current track's cached file (skipped/stopped tracks) current = ps.get("current") if current: _cleanup_track(current) if duck_task and not duck_task.done(): duck_task.cancel() ps["current"] = None ps["done_event"] = None ps["task"] = None ps["duck_vol"] = None ps["duck_task"] = None ps["fade_vol"] = None ps["fade_step"] = None ps["progress"] = None ps["cur_seek"] = 0.0 ps["seek_req"] = None def _ensure_loop(bot, *, seek: float = 0.0, fade_in: float | bool = True) -> None: """Start the play loop if not already running.""" ps = _ps(bot) task = ps.get("task") if task and not task.done(): return ps["task"] = bot._spawn( _play_loop(bot, seek=seek, fade_in=fade_in), name="music-play-loop", ) _MAX_HISTORY = 10 async def _fade_and_cancel(bot, duration: float = 3.0) -> None: """Fade audio to zero over ``duration`` seconds, then cancel the task.""" ps = _ps(bot) task = ps.get("task") if not task or task.done(): return # Compute step from actual current volume so fade always spans `duration`. # At 3% vol: step = 0.03/40 = 0.00075 (still ~0.8s fade). # At 50% vol: step = 0.50/40 = 0.0125. cur_vol = ( ps["duck_vol"] if ps["duck_vol"] is not None else ps["volume"] ) / 100.0 n_frames = max(duration / 0.02, 1) step = max(cur_vol / n_frames, 0.0001) ps["fade_step"] = step ps["fade_vol"] = 0 log.debug("music: fading out (vol=%.2f, step=%.5f, duration=%.1fs)", cur_vol, step, duration) await asyncio.sleep(duration) ps["fade_step"] = None if not task.done(): task.cancel() try: await task except (asyncio.CancelledError, Exception): pass # -- Commands ---------------------------------------------------------------- @command("play", help="Music: !play ") async def cmd_play(bot, message): """Play a URL or add to queue if already playing. Usage: !play Play audio from URL (YouTube, SoundCloud, etc.) !play Search YouTube and play the first result Playlists are expanded into individual tracks. If the queue is nearly full, only as many tracks as will fit are enqueued. """ if not _is_mumble(bot): await bot.reply(message, "Music playback is Mumble-only") return parts = message.text.split(None, 1) if len(parts) < 2: await bot.reply(message, "Usage: !play ") return url = parts[1].strip() ps = _ps(bot) # Play a kept track by ID: !play #3 if url.startswith("#"): kid = url[1:] raw = bot.state.get("music", f"keep:{kid}") if not raw: await bot.reply(message, f"No kept track with ID #{kid}") return try: meta = json.loads(raw) except (json.JSONDecodeError, TypeError): await bot.reply(message, f"Bad metadata for #{kid}") return fpath = _MUSIC_DIR / meta.get("filename", "") if not fpath.is_file(): await bot.reply(message, f"File missing for #{kid}") return title = meta.get("title") or meta.get("filename", kid) track = _Track(url=meta.get("url", str(fpath)), title=title, requester=message.nick or "?") track.local_path = fpath track.keep = True was_idle = ps["current"] is None ps["queue"].append(track) if was_idle: await bot.reply(message, f"Playing: {_truncate(title)}") else: await bot.reply(message, f"Queued #{len(ps['queue'])}: {_truncate(title)}") _ensure_loop(bot) return is_search = not _is_url(url) if is_search: url = f"ytsearch10:{url}" if len(ps["queue"]) >= _MAX_QUEUE: await bot.reply(message, f"Queue full ({_MAX_QUEUE} tracks)") return remaining = _MAX_QUEUE - len(ps["queue"]) loop = asyncio.get_running_loop() resolved = await loop.run_in_executor(None, _resolve_tracks, url, remaining) # Search: pick one random result instead of enqueuing all if is_search and len(resolved) > 1: resolved = [random.choice(resolved)] was_idle = ps["current"] is None requester = message.nick or "?" added = 0 # Only set origin for direct URLs (not searches) so resume uses the # resolved video URL rather than an ephemeral search query origin = url if not is_search else "" for track_url, track_title in resolved[:remaining]: ps["queue"].append(_Track(url=track_url, title=track_title, requester=requester, origin=origin)) added += 1 total_resolved = len(resolved) if added == 1: title = _truncate(resolved[0][1]) if was_idle: await bot.reply(message, f"Playing: {title}") else: pos = len(ps["queue"]) await bot.reply(message, f"Queued #{pos}: {title}") elif added < total_resolved: await bot.reply( message, f"Queued {added} of {total_resolved} tracks (queue full)", ) else: await bot.reply(message, f"Queued {added} tracks") if was_idle: _ensure_loop(bot) @command("stop", help="Music: !stop") async def cmd_stop(bot, message): """Stop playback and clear queue.""" if not _is_mumble(bot): return ps = _ps(bot) ps["queue"].clear() task = ps.get("task") if task and not task.done(): await _fade_and_cancel(bot) else: ps["current"] = None ps["task"] = None ps["done_event"] = None ps["duck_vol"] = None ps["duck_task"] = None ps["fade_vol"] = None ps["fade_step"] = None ps["progress"] = None ps["cur_seek"] = 0.0 await bot.reply(message, "Stopped") @command("resume", help="Music: !resume -- resume last stopped track") async def cmd_resume(bot, message): """Resume playback from the last interrupted position. Loads the track URL and elapsed time saved when playback was stopped or skipped. The position persists across bot restarts. """ if not _is_mumble(bot): await bot.reply(message, "Music playback is Mumble-only") return ps = _ps(bot) if ps["current"] is not None: await bot.reply(message, "Already playing") return data = _load_resume(bot) if data is None: await bot.reply(message, "Nothing to resume") return elapsed = data.get("elapsed", 0.0) track = _Track( url=data["url"], title=data.get("title", data["url"]), requester=data.get("requester", "?"), ) ps["queue"].insert(0, track) _clear_resume(bot) await bot.reply( message, f"Resuming: {_truncate(track.title)} from {_fmt_time(elapsed)}", ) _ensure_loop(bot, seek=elapsed) @command("skip", help="Music: !skip", aliases=["next"]) async def cmd_skip(bot, message): """Skip current track, advance to next in queue.""" if not _is_mumble(bot): return ps = _ps(bot) if ps["current"] is None: await bot.reply(message, "Nothing playing") return skipped = ps["current"] # Push skipped track to history ps["history"].append(_Track(url=skipped.url, title=skipped.title, requester=skipped.requester, origin=skipped.origin)) if len(ps["history"]) > _MAX_HISTORY: ps["history"].pop(0) await _fade_and_cancel(bot) if ps["queue"]: _ensure_loop(bot) await bot.reply( message, f"Skipped: {_truncate(skipped.title)}", ) else: await bot.reply(message, "Skipped, queue empty") @command("prev", help="Music: !prev -- play previous track") async def cmd_prev(bot, message): """Go back to the previous track.""" if not _is_mumble(bot): return ps = _ps(bot) if not ps["history"]: await bot.reply(message, "No previous track") return prev = ps["history"].pop() # Re-queue current track so it plays next after prev if ps["current"] is not None: ps["queue"].insert(0, _Track(url=ps["current"].url, title=ps["current"].title, requester=ps["current"].requester, origin=ps["current"].origin)) ps["queue"].insert(0, prev) await _fade_and_cancel(bot) _ensure_loop(bot) await bot.reply(message, f"Previous: {_truncate(prev.title)}") @command("seek", help="Music: !seek ") async def cmd_seek(bot, message): """Seek to position in current track. Usage: !seek 1:30 Seek to 1 minute 30 seconds !seek 90 Seek to 90 seconds !seek +30 Jump forward 30 seconds !seek -30 Jump backward 30 seconds !seek +1:00 Jump forward 1 minute """ if not _is_mumble(bot): return ps = _ps(bot) parts = message.text.split(None, 1) if len(parts) < 2: await bot.reply(message, "Usage: !seek (e.g. 1:30, +30, -30)") return try: mode, seconds = _parse_seek(parts[1].strip()) except ValueError: await bot.reply(message, "Usage: !seek (e.g. 1:30, +30, -30)") return # Compute target position if mode == "abs": target = seconds else: progress = ps.get("progress") cur_seek = ps.get("cur_seek", 0.0) elapsed = cur_seek + (progress[0] * 0.02 if progress else 0.0) target = elapsed + seconds target = max(0.0, target) seek_req = ps.get("seek_req") if not seek_req: await bot.reply(message, "Nothing playing") return seek_req[0] = target ps["cur_seek"] = target if ps.get("progress"): ps["progress"][0] = 0 await bot.reply(message, f"Seeking to {_fmt_time(target)}") @command("queue", help="Music: !queue [url]") async def cmd_queue(bot, message): """Show queue or add a URL. Usage: !queue Show current queue !queue Add URL to queue (alias for !play) """ if not _is_mumble(bot): return parts = message.text.split(None, 1) if len(parts) >= 2: # Alias for !play await cmd_play(bot, message) return ps = _ps(bot) lines = [] if ps["current"]: lines.append( f"Now: {_truncate(ps['current'].title)}" f" [{ps['current'].requester}]" ) if ps["queue"]: for i, track in enumerate(ps["queue"], 1): lines.append( f" {i}. {_truncate(track.title)} [{track.requester}]" ) else: if not ps["current"]: lines.append("Queue empty") for line in lines: await bot.reply(message, line) @command("np", help="Music: !np") async def cmd_np(bot, message): """Show now-playing track.""" if not _is_mumble(bot): return ps = _ps(bot) if ps["current"] is None: await bot.reply(message, "Nothing playing") return track = ps["current"] progress = ps.get("progress") cur_seek = ps.get("cur_seek", 0.0) elapsed = cur_seek + (progress[0] * 0.02 if progress else 0.0) await bot.reply( message, f"Now playing: {_truncate(track.title)} [{track.requester}]" f" ({_fmt_time(elapsed)})", ) @command("testtone", help="Music: !testtone -- debug sine wave") async def cmd_testtone(bot, message): """Send a 3-second test tone for voice debugging.""" if not _is_mumble(bot): await bot.reply(message, "Mumble-only feature") return await bot.reply(message, "Sending 440Hz test tone (3s)...") await bot.test_tone(3.0) await bot.reply(message, "Test tone complete") @command("volume", help="Music: !volume [0-100|+N|-N]") async def cmd_volume(bot, message): """Get or set playback volume. Usage: !volume Show current volume !volume <0-100> Set volume (takes effect immediately) !volume +N/-N Adjust volume relatively """ if not _is_mumble(bot): return ps = _ps(bot) parts = message.text.split(None, 1) if len(parts) < 2: await bot.reply(message, f"Volume: {ps['volume']}%") return arg = parts[1].strip() relative = arg.startswith("+") or (arg.startswith("-") and arg != "-") try: val = int(arg) except ValueError: await bot.reply(message, "Usage: !volume <0-100|+N|-N>") return if relative: val = ps["volume"] + val if val < 0 or val > 100: await bot.reply(message, "Volume must be 0-100") return ps["volume"] = val bot.state.set("music", "volume", str(val)) await bot.reply(message, f"Volume set to {val}%") @command("duck", help="Music: !duck [on|off|floor N|silence N|restore N]") async def cmd_duck(bot, message): """Configure voice-activated volume ducking. Usage: !duck Show ducking status and settings !duck on Enable voice ducking !duck off Disable voice ducking !duck floor <0-100> Set floor volume % !duck silence Set silence timeout (seconds) !duck restore Set restore ramp duration (seconds) """ if not _is_mumble(bot): await bot.reply(message, "Mumble-only feature") return ps = _ps(bot) parts = message.text.split() if len(parts) < 2: state = "on" if ps["duck_enabled"] else "off" ducking = "" if ps["duck_vol"] is not None: ducking = f", ducked to {int(ps['duck_vol'])}%" await bot.reply( message, f"Duck: {state} | floor={ps['duck_floor']}%" f" silence={ps['duck_silence']}s" f" restore={ps['duck_restore']}s{ducking}", ) return sub = parts[1].lower() if sub == "on": ps["duck_enabled"] = True await bot.reply(message, "Voice ducking enabled") elif sub == "off": ps["duck_enabled"] = False ps["duck_vol"] = None await bot.reply(message, "Voice ducking disabled") elif sub == "floor": if len(parts) < 3: await bot.reply(message, "Usage: !duck floor <0-100>") return try: val = int(parts[2]) except ValueError: await bot.reply(message, "Usage: !duck floor <0-100>") return if val < 0 or val > 100: await bot.reply(message, "Floor must be 0-100") return ps["duck_floor"] = val await bot.reply(message, f"Duck floor set to {val}%") elif sub == "silence": if len(parts) < 3: await bot.reply(message, "Usage: !duck silence ") return try: val = int(parts[2]) except ValueError: await bot.reply(message, "Usage: !duck silence ") return if val < 1: await bot.reply(message, "Silence timeout must be >= 1") return ps["duck_silence"] = val await bot.reply(message, f"Duck silence set to {val}s") elif sub == "restore": if len(parts) < 3: await bot.reply(message, "Usage: !duck restore ") return try: val = int(parts[2]) except ValueError: await bot.reply(message, "Usage: !duck restore ") return if val < 1: await bot.reply(message, "Restore duration must be >= 1") return ps["duck_restore"] = val await bot.reply(message, f"Duck restore set to {val}s") else: await bot.reply( message, "Usage: !duck [on|off|floor N|silence N|restore N]", ) @command("keep", help="Music: !keep -- keep current track's audio file") async def cmd_keep(bot, message): """Mark the current track's local file to keep after playback. Fetches metadata (title, artist, duration) and persists it alongside the filename so ``!kept`` can display useful information. """ if not _is_mumble(bot): await bot.reply(message, "Mumble-only feature") return ps = _ps(bot) track = ps["current"] if track is None: await bot.reply(message, "Nothing playing") return if track.local_path is None: await bot.reply(message, "No local file for current track") return track.keep = True # Check if this track is already kept (by normalized URL) norm_url = _strip_playlist_params(track.url) for key in bot.state.keys("music"): if not key.startswith("keep:"): continue raw = bot.state.get("music", key) if not raw: continue try: existing = json.loads(raw) except (json.JSONDecodeError, TypeError): continue if _strip_playlist_params(existing.get("url", "")) == norm_url: kid = existing.get("id", key.split(":", 1)[1]) await bot.reply(message, f"Already kept as #{kid}") return # Assign a unique short ID last_id = int(bot.state.get("music", "keep_next_id") or "1") keep_id = last_id bot.state.set("music", "keep_next_id", str(last_id + 1)) # Fetch metadata in background loop = asyncio.get_running_loop() meta = await loop.run_in_executor(None, _fetch_metadata, track.url) # Move file from cache to kept directory with a clean name _MUSIC_DIR.mkdir(parents=True, exist_ok=True) tid = hashlib.md5(track.url.encode()).hexdigest()[:12] clean_name = _sanitize_filename(meta.get("title", ""), tid) ext = track.local_path.suffix dest = _MUSIC_DIR / f"{clean_name}{ext}" if track.local_path != dest and not dest.exists(): shutil.move(str(track.local_path), str(dest)) track.local_path = dest filename = track.local_path.name meta["filename"] = filename meta["url"] = track.url meta["id"] = keep_id bot.state.set("music", f"keep:{keep_id}", json.dumps(meta)) # Build display string title = meta.get("title") or track.title artist = meta.get("artist", "") dur = meta.get("duration", 0) label = _truncate(title) if artist and artist.lower() not in ("na", "unknown", ""): label += f" -- {artist}" if dur > 0: label += f" ({_fmt_time(dur)})" await bot.reply(message, f"Keeping #{keep_id}: {label}") @command("kept", help="Music: !kept [clear] -- list or clear kept files") async def cmd_kept(bot, message): """List or clear kept audio files in data/music/. When metadata is available (from ``!keep``), displays title, artist, duration, and file size. Falls back to filename + size otherwise. """ if not _is_mumble(bot): await bot.reply(message, "Mumble-only feature") return parts = message.text.split() if len(parts) >= 2 and parts[1].lower() == "clear": count = 0 if _MUSIC_DIR.is_dir(): for f in _MUSIC_DIR.iterdir(): if f.is_file(): f.unlink() count += 1 # Clear stored metadata and reset ID counter for key in bot.state.keys("music"): if key.startswith("keep:") or key == "keep_next_id": bot.state.delete("music", key) await bot.reply(message, f"Deleted {count} file(s)") return # Collect kept entries from state entries = [] for key in bot.state.keys("music"): if not key.startswith("keep:"): continue raw = bot.state.get("music", key) if not raw: continue try: meta = json.loads(raw) except (json.JSONDecodeError, TypeError): continue entries.append(meta) if not entries: await bot.reply(message, "No kept tracks") return entries.sort(key=lambda m: m.get("id", 0)) lines = [f"Kept tracks ({len(entries)}):"] for meta in entries: kid = meta.get("id", "?") title = meta.get("title", "") artist = meta.get("artist", "") dur = meta.get("duration", 0) filename = meta.get("filename", "") label = _truncate(title) if title else filename if artist and artist.lower() not in ("na", "unknown", ""): label += f" -- {artist}" if dur > 0: label += f" ({_fmt_time(dur)})" # Show file size if file exists fpath = _MUSIC_DIR / filename if filename else None size = "" if fpath and fpath.is_file(): size = f" [{fpath.stat().st_size / (1024 * 1024):.1f}MB]" lines.append(f" #{kid} {label}{size}") await bot.long_reply(message, lines, label="kept tracks") # -- Plugin lifecycle -------------------------------------------------------- async def on_connected(bot) -> None: """Called by MumbleBot after each (re)connection. Ensures the reconnect watcher is running -- triggers boot-resume and reconnect-resume without waiting for a user command. """ if not _is_mumble(bot): return ps = _ps(bot) saved_vol = bot.state.get("music", "volume") if saved_vol is not None: try: ps["volume"] = max(0, min(100, int(saved_vol))) except ValueError: pass if ps["_watcher_task"] is None and hasattr(bot, "_spawn"): ps["_watcher_task"] = bot._spawn( _reconnect_watcher(bot), name="music-reconnect-watcher", )