Hostmask-based admin controls with automatic IRCOP detection via WHO. Permission enforcement in the central dispatch path denies restricted commands to non-admins. Includes !whoami and !admins commands, marks load/reload/unload as admin-only. Also lands previously-implemented SASL PLAIN auth, token-bucket rate limiting, and CTCP VERSION/TIME/PING responses that were staged but uncommitted. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
166 lines
5.4 KiB
Python
166 lines
5.4 KiB
Python
"""Core plugin: ping, help, version, plugin management."""
|
|
|
|
from collections import Counter
|
|
|
|
from derp import __version__
|
|
from derp.plugin import command
|
|
|
|
|
|
@command("ping", help="Check if the bot is alive")
|
|
async def cmd_ping(bot, message):
|
|
"""Respond with pong."""
|
|
await bot.reply(message, "pong")
|
|
|
|
|
|
@command("help", help="List commands or show command/plugin help")
|
|
async def cmd_help(bot, message):
|
|
"""Show available commands, or help for a specific command or plugin.
|
|
|
|
Usage: !help [command|plugin]
|
|
"""
|
|
parts = message.text.split(None, 2)
|
|
if len(parts) > 1:
|
|
name = parts[1].lower().lstrip(bot.prefix)
|
|
|
|
# Check command first
|
|
handler = bot.registry.commands.get(name)
|
|
if handler:
|
|
help_text = handler.help or "No help available."
|
|
await bot.reply(message, f"{bot.prefix}{name} -- {help_text}")
|
|
return
|
|
|
|
# Check plugin
|
|
module = bot.registry._modules.get(name)
|
|
if module:
|
|
desc = (getattr(module, "__doc__", "") or "").split("\n")[0].strip()
|
|
cmds = sorted(
|
|
k for k, v in bot.registry.commands.items() if v.plugin == name
|
|
)
|
|
lines = [f"{name} -- {desc}" if desc else name]
|
|
if cmds:
|
|
lines.append(f"Commands: {', '.join(bot.prefix + c for c in cmds)}")
|
|
await bot.reply(message, " | ".join(lines))
|
|
return
|
|
|
|
await bot.reply(message, f"Unknown command or plugin: {name}")
|
|
return
|
|
|
|
# List all commands
|
|
names = sorted(bot.registry.commands.keys())
|
|
await bot.reply(message, f"Commands: {', '.join(bot.prefix + n for n in names)}")
|
|
|
|
|
|
@command("version", help="Show bot version")
|
|
async def cmd_version(bot, message):
|
|
"""Report the running version."""
|
|
await bot.reply(message, f"derp {__version__}")
|
|
|
|
|
|
@command("uptime", help="Show how long the bot has been running")
|
|
async def cmd_uptime(bot, message):
|
|
"""Report bot uptime."""
|
|
import time
|
|
|
|
elapsed = int(time.monotonic() - bot._started)
|
|
days, rem = divmod(elapsed, 86400)
|
|
hours, rem = divmod(rem, 3600)
|
|
minutes, secs = divmod(rem, 60)
|
|
parts = []
|
|
if days:
|
|
parts.append(f"{days}d")
|
|
if hours:
|
|
parts.append(f"{hours}h")
|
|
if minutes:
|
|
parts.append(f"{minutes}m")
|
|
parts.append(f"{secs}s")
|
|
await bot.reply(message, f"up {' '.join(parts)}")
|
|
|
|
|
|
@command("load", help="Hot-load a plugin: !load <name>", admin=True)
|
|
async def cmd_load(bot, message):
|
|
"""Load a new plugin from the plugins directory."""
|
|
parts = message.text.split(None, 2)
|
|
if len(parts) < 2:
|
|
await bot.reply(message, "Usage: !load <plugin>")
|
|
return
|
|
name = parts[1].lower()
|
|
ok, reason = bot.load_plugin(name)
|
|
if ok:
|
|
await bot.reply(message, f"Loaded plugin: {name} ({reason})")
|
|
else:
|
|
await bot.reply(message, f"Failed to load plugin: {reason}")
|
|
|
|
|
|
@command("reload", help="Reload a plugin: !reload <name>", admin=True)
|
|
async def cmd_reload(bot, message):
|
|
"""Unload and reload a plugin, picking up file changes."""
|
|
parts = message.text.split(None, 2)
|
|
if len(parts) < 2:
|
|
await bot.reply(message, "Usage: !reload <plugin>")
|
|
return
|
|
name = parts[1].lower()
|
|
ok, reason = bot.reload_plugin(name)
|
|
if ok:
|
|
await bot.reply(message, f"Reloaded plugin: {name}")
|
|
else:
|
|
await bot.reply(message, f"Failed to reload plugin: {reason}")
|
|
|
|
|
|
@command("unload", help="Unload a plugin: !unload <name>", admin=True)
|
|
async def cmd_unload(bot, message):
|
|
"""Unload a plugin, removing all its handlers."""
|
|
parts = message.text.split(None, 2)
|
|
if len(parts) < 2:
|
|
await bot.reply(message, "Usage: !unload <plugin>")
|
|
return
|
|
name = parts[1].lower()
|
|
ok, reason = bot.unload_plugin(name)
|
|
if ok:
|
|
await bot.reply(message, f"Unloaded plugin: {name}")
|
|
else:
|
|
await bot.reply(message, f"Failed to unload plugin: {reason}")
|
|
|
|
|
|
@command("plugins", help="List loaded plugins")
|
|
async def cmd_plugins(bot, message):
|
|
"""List all loaded plugins with handler counts."""
|
|
counts: Counter[str] = Counter()
|
|
for handler in bot.registry.commands.values():
|
|
counts[handler.plugin] += 1
|
|
for handlers in bot.registry.events.values():
|
|
for handler in handlers:
|
|
counts[handler.plugin] += 1
|
|
parts = [f"{name} ({counts[name]})" for name in sorted(counts)]
|
|
await bot.reply(message, f"Plugins: {', '.join(parts)}")
|
|
|
|
|
|
@command("whoami", help="Show your hostmask and admin status")
|
|
async def cmd_whoami(bot, message):
|
|
"""Display the sender's hostmask and permission level."""
|
|
prefix = message.prefix or "unknown"
|
|
is_admin = bot._is_admin(message)
|
|
is_oper = message.prefix in bot._opers if message.prefix else False
|
|
tags = []
|
|
if is_admin:
|
|
tags.append("admin")
|
|
else:
|
|
tags.append("user")
|
|
if is_oper:
|
|
tags.append("IRCOP")
|
|
await bot.reply(message, f"{prefix} [{', '.join(tags)}]")
|
|
|
|
|
|
@command("admins", help="Show configured admin patterns and detected opers", admin=True)
|
|
async def cmd_admins(bot, message):
|
|
"""Display admin hostmask patterns and known IRC operators."""
|
|
parts = []
|
|
if bot._admins:
|
|
parts.append(f"Patterns: {', '.join(bot._admins)}")
|
|
else:
|
|
parts.append("Patterns: (none)")
|
|
if bot._opers:
|
|
parts.append(f"Opers: {', '.join(sorted(bot._opers))}")
|
|
else:
|
|
parts.append("Opers: (none)")
|
|
await bot.reply(message, " | ".join(parts))
|