Files
derp/plugins/core.py
user 69976196cd feat: paste detailed help via FlaskPaste for !help command
!help <cmd> now pastes the command's docstring and appends the URL.
!help <plugin> pastes detail for all plugin commands.
!help (no args) pastes a full reference grouped by plugin.
Falls back gracefully when flaskpaste is not loaded or paste fails.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 22:51:55 +01:00

330 lines
11 KiB
Python

"""Core plugin: ping, help, version, plugin management."""
import asyncio
import textwrap
from collections import Counter
from derp import __version__
from derp.plugin import command
def _build_cmd_detail(handler, prefix: str) -> str:
"""Extract and format a command's docstring into a detail block."""
doc = textwrap.dedent(handler.callback.__doc__ or "").strip()
if not doc:
return ""
header = f"{prefix}{handler.name}"
if handler.help:
header += f" -- {handler.help}"
return f"{header}\n{doc}"
async def _paste(bot, text: str) -> str | None:
"""Create a paste via FlaskPaste. Returns URL or None."""
fp = bot.registry._modules.get("flaskpaste")
if not fp:
return None
loop = asyncio.get_running_loop()
try:
return await loop.run_in_executor(None, fp.create_paste, bot, text)
except Exception:
return None
@command("ping", help="Check if the bot is alive")
async def cmd_ping(bot, message):
"""Respond with pong."""
await bot.reply(message, "pong")
@command("help", help="List commands or show command/plugin help")
async def cmd_help(bot, message):
"""Show available commands, or help for a specific command or plugin.
Usage: !help [command|plugin]
"""
channel = message.target if message.is_channel else None
parts = message.text.split(None, 2)
if len(parts) > 1:
name = parts[1].lower().lstrip(bot.prefix)
# Check command first
handler = bot.registry.commands.get(name)
if handler and bot._plugin_allowed(handler.plugin, channel):
help_text = handler.help or "No help available."
reply = f"{bot.prefix}{name} -- {help_text}"
detail = _build_cmd_detail(handler, bot.prefix)
if detail:
url = await _paste(bot, detail)
if url:
reply += f" | {url}"
await bot.reply(message, reply)
return
# Check plugin
module = bot.registry._modules.get(name)
if module and bot._plugin_allowed(name, channel):
desc = (getattr(module, "__doc__", "") or "").split("\n")[0].strip()
cmds = sorted(
k for k, v in bot.registry.commands.items()
if v.plugin == name and bot._plugin_allowed(v.plugin, channel)
)
lines = [f"{name} -- {desc}" if desc else name]
if cmds:
lines.append(f"Commands: {', '.join(bot.prefix + c for c in cmds)}")
reply = " | ".join(lines)
# Build detail block for all plugin commands
blocks = []
for cmd_name in cmds:
h = bot.registry.commands[cmd_name]
blk = _build_cmd_detail(h, bot.prefix)
if blk:
blocks.append(blk)
if blocks:
url = await _paste(bot, "\n\n".join(blocks))
if url:
reply += f" | {url}"
await bot.reply(message, reply)
return
await bot.reply(message, f"Unknown command or plugin: {name}")
return
# List all commands visible in this channel
names = sorted(
k for k, v in bot.registry.commands.items()
if bot._plugin_allowed(v.plugin, channel)
)
reply = ", ".join(names)
# Build full reference grouped by plugin
plugins: dict[str, list[str]] = {}
for cmd_name in names:
h = bot.registry.commands[cmd_name]
plugins.setdefault(h.plugin, []).append(cmd_name)
blocks = []
for plugin_name in sorted(plugins):
mod = bot.registry._modules.get(plugin_name)
desc = (getattr(mod, "__doc__", "") or "").split("\n")[0].strip() if mod else ""
header = f"[{plugin_name}]"
if desc:
header += f" {desc}"
cmd_lines = []
for cmd_name in plugins[plugin_name]:
h = bot.registry.commands[cmd_name]
detail = _build_cmd_detail(h, bot.prefix)
if detail:
cmd_lines.append(detail)
else:
line = f"{bot.prefix}{cmd_name}"
if h.help:
line += f" -- {h.help}"
cmd_lines.append(line)
blocks.append(header + "\n" + "\n\n".join(cmd_lines))
if blocks:
url = await _paste(bot, "\n\n".join(blocks))
if url:
reply += f" | {url}"
await bot.reply(message, reply)
@command("version", help="Show bot version")
async def cmd_version(bot, message):
"""Report the running version."""
await bot.reply(message, f"derp {__version__} ({bot.name})")
@command("uptime", help="Show how long the bot has been running")
async def cmd_uptime(bot, message):
"""Report bot uptime."""
import time
elapsed = int(time.monotonic() - bot._started)
days, rem = divmod(elapsed, 86400)
hours, rem = divmod(rem, 3600)
minutes, secs = divmod(rem, 60)
parts = []
if days:
parts.append(f"{days}d")
if hours:
parts.append(f"{hours}h")
if minutes:
parts.append(f"{minutes}m")
parts.append(f"{secs}s")
await bot.reply(message, f"up {' '.join(parts)}")
@command("load", help="Hot-load a plugin: !load <name>", admin=True)
async def cmd_load(bot, message):
"""Load a new plugin from the plugins directory."""
parts = message.text.split(None, 2)
if len(parts) < 2:
await bot.reply(message, "Usage: !load <plugin>")
return
name = parts[1].lower()
ok, reason = bot.load_plugin(name)
if ok:
await bot.reply(message, f"Loaded plugin: {name} ({reason})")
else:
await bot.reply(message, f"Failed to load plugin: {reason}")
@command("reload", help="Reload a plugin: !reload <name>", admin=True)
async def cmd_reload(bot, message):
"""Unload and reload a plugin, picking up file changes."""
parts = message.text.split(None, 2)
if len(parts) < 2:
await bot.reply(message, "Usage: !reload <plugin>")
return
name = parts[1].lower()
ok, reason = bot.reload_plugin(name)
if ok:
await bot.reply(message, f"Reloaded plugin: {name}")
else:
await bot.reply(message, f"Failed to reload plugin: {reason}")
@command("unload", help="Unload a plugin: !unload <name>", admin=True)
async def cmd_unload(bot, message):
"""Unload a plugin, removing all its handlers."""
parts = message.text.split(None, 2)
if len(parts) < 2:
await bot.reply(message, "Usage: !unload <plugin>")
return
name = parts[1].lower()
ok, reason = bot.unload_plugin(name)
if ok:
await bot.reply(message, f"Unloaded plugin: {name}")
else:
await bot.reply(message, f"Failed to unload plugin: {reason}")
@command("plugins", help="List loaded plugins")
async def cmd_plugins(bot, message):
"""List all loaded plugins with handler counts."""
counts: Counter[str] = Counter()
for handler in bot.registry.commands.values():
counts[handler.plugin] += 1
for handlers in bot.registry.events.values():
for handler in handlers:
counts[handler.plugin] += 1
parts = [f"{name} ({counts[name]})" for name in sorted(counts)]
await bot.reply(message, f"Plugins: {', '.join(parts)}")
@command("whoami", help="Show your hostmask and permission tier")
async def cmd_whoami(bot, message):
"""Display the sender's hostmask and permission level."""
prefix = message.prefix or "unknown"
tier = bot._get_tier(message)
tags = [tier]
opers = getattr(bot, "_opers", set())
if message.prefix and message.prefix in opers:
tags.append("IRCOP")
await bot.reply(message, f"{prefix} [{', '.join(tags)}]")
@command("admins", help="Show configured permission tiers and detected opers", admin=True)
async def cmd_admins(bot, message):
"""Display configured permission tiers and known IRC operators."""
parts = []
if bot._admins:
parts.append(f"Admin: {', '.join(bot._admins)}")
else:
parts.append("Admin: (none)")
sorcerers = getattr(bot, "_sorcerers", [])
if sorcerers:
parts.append(f"Sorcerer: {', '.join(sorcerers)}")
if bot._operators:
parts.append(f"Oper: {', '.join(bot._operators)}")
if bot._trusted:
parts.append(f"Trusted: {', '.join(bot._trusted)}")
opers = getattr(bot, "_opers", set())
if opers:
parts.append(f"IRCOPs: {', '.join(sorted(opers))}")
else:
parts.append("IRCOPs: (none)")
await bot.reply(message, " | ".join(parts))
@command("deaf", help="Toggle voice listener deaf on Mumble")
async def cmd_deaf(bot, message):
"""Toggle the voice listener's deaf state on Mumble.
Targets the bot with ``receive_sound = true`` (merlin) so that
deafening stops ducking without affecting the music bot's playback.
"""
# Find the listener bot (receive_sound=true) among registered peers
listener = None
bots = getattr(bot.registry, "_bots", {})
for peer in bots.values():
if getattr(peer, "_receive_sound", False):
listener = peer
break
mumble = getattr(listener or bot, "_mumble", None)
if mumble is None:
return
myself = mumble.users.myself
name = getattr(listener, "nick", "bot")
if myself.get("self_deaf", False):
myself.undeafen()
myself.unmute()
await bot.reply(message, f"{name}: undeafened")
else:
myself.deafen()
await bot.reply(message, f"{name}: deafened")
@command("state", help="Inspect plugin state: !state <list|get|del|clear> ...", admin=True)
async def cmd_state(bot, message):
"""Manage the plugin state store.
Usage:
!state list <plugin> List keys
!state get <plugin> <key> Get a value
!state del <plugin> <key> Delete a key
!state clear <plugin> Clear all state
"""
parts = message.text.split()
if len(parts) < 3:
await bot.reply(message, "Usage: !state <list|get|del|clear> <plugin> [key]")
return
action = parts[1].lower()
plugin = parts[2]
if action == "list":
keys = bot.state.keys(plugin)
if keys:
await bot.reply(message, f"{plugin}: {', '.join(keys)}")
else:
await bot.reply(message, f"{plugin}: (no keys)")
elif action == "get":
if len(parts) < 4:
await bot.reply(message, "Usage: !state get <plugin> <key>")
return
key = parts[3]
value = bot.state.get(plugin, key)
if value is not None:
await bot.reply(message, f"{plugin}.{key} = {value}")
else:
await bot.reply(message, f"{plugin}.{key}: not set")
elif action == "del":
if len(parts) < 4:
await bot.reply(message, "Usage: !state del <plugin> <key>")
return
key = parts[3]
if bot.state.delete(plugin, key):
await bot.reply(message, f"Deleted {plugin}.{key}")
else:
await bot.reply(message, f"{plugin}.{key}: not found")
elif action == "clear":
count = bot.state.clear(plugin)
await bot.reply(message, f"Cleared {count} key(s) from {plugin}")
else:
await bot.reply(message, "Usage: !state <list|get|del|clear> <plugin> [key]")