"""Plugin: timestamped operational log (SQLite per-channel).""" from __future__ import annotations import logging import sqlite3 from datetime import datetime, timezone from pathlib import Path from derp.plugin import command log = logging.getLogger(__name__) _DB_PATH = Path("data/opslog.db") _MAX_LIST = 10 _MAX_SEARCH = 10 _conn: sqlite3.Connection | None = None def _db() -> sqlite3.Connection: """Lazy-init the database connection and schema.""" global _conn if _conn is not None: return _conn _DB_PATH.parent.mkdir(parents=True, exist_ok=True) _conn = sqlite3.connect(str(_DB_PATH)) _conn.execute(""" CREATE TABLE IF NOT EXISTS entries ( id INTEGER PRIMARY KEY AUTOINCREMENT, channel TEXT NOT NULL, nick TEXT NOT NULL, ts TEXT NOT NULL, message TEXT NOT NULL ) """) _conn.execute("CREATE INDEX IF NOT EXISTS idx_entries_channel ON entries(channel)") _conn.commit() return _conn @command("opslog", help="Op log: !opslog add|list|search|del|clear") async def cmd_opslog(bot, message): """Timestamped operational log per channel. Usage: !opslog add Add a log entry !opslog list [n] Show last n entries (default 5) !opslog search Search entries !opslog del Delete an entry !opslog clear Clear all entries for this channel (admin) """ parts = message.text.split(None, 2) if len(parts) < 2: await bot.reply(message, "Usage: !opslog [args]") return sub = parts[1].lower() rest = parts[2] if len(parts) > 2 else "" channel = message.target or "dm" if sub == "add": if not rest: await bot.reply(message, "Usage: !opslog add ") return ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M") db = _db() cur = db.execute( "INSERT INTO entries (channel, nick, ts, message) VALUES (?, ?, ?, ?)", (channel, message.nick or "?", ts, rest), ) db.commit() await bot.reply(message, f"[{cur.lastrowid}] logged") elif sub == "list": limit = _MAX_LIST if rest: try: limit = min(int(rest), _MAX_LIST) except ValueError: pass db = _db() rows = db.execute( "SELECT id, nick, ts, message FROM entries WHERE channel = ? " "ORDER BY id DESC LIMIT ?", (channel, limit), ).fetchall() if not rows: await bot.reply(message, "No entries") return for row_id, nick, ts, msg in reversed(rows): await bot.reply(message, f"[{row_id}] {ts} <{nick}> {msg}") elif sub == "search": if not rest: await bot.reply(message, "Usage: !opslog search ") return db = _db() rows = db.execute( "SELECT id, nick, ts, message FROM entries " "WHERE channel = ? AND message LIKE ? ORDER BY id DESC LIMIT ?", (channel, f"%{rest}%", _MAX_SEARCH), ).fetchall() if not rows: await bot.reply(message, f"No entries matching '{rest}'") return for row_id, nick, ts, msg in reversed(rows): await bot.reply(message, f"[{row_id}] {ts} <{nick}> {msg}") elif sub == "del": if not rest: await bot.reply(message, "Usage: !opslog del ") return try: entry_id = int(rest) except ValueError: await bot.reply(message, "Invalid ID") return db = _db() cur = db.execute( "DELETE FROM entries WHERE id = ? AND channel = ?", (entry_id, channel), ) db.commit() if cur.rowcount: await bot.reply(message, f"Deleted entry {entry_id}") else: await bot.reply(message, f"Entry {entry_id} not found") elif sub == "clear": if not bot._is_admin(message): await bot.reply(message, "Permission denied: clear requires admin") return db = _db() cur = db.execute("DELETE FROM entries WHERE channel = ?", (channel,)) db.commit() await bot.reply(message, f"Cleared {cur.rowcount} entries") else: await bot.reply(message, "Usage: !opslog [args]")