diff --git a/plugins/abuseipdb.py b/plugins/abuseipdb.py index fa625b4..fd711a9 100644 --- a/plugins/abuseipdb.py +++ b/plugins/abuseipdb.py @@ -142,5 +142,4 @@ async def cmd_abuse(bot, message): return f"{addr} -- error: {exc}" results = await asyncio.gather(*[_query(a) for a in addrs]) - for line in results: - await bot.reply(message, line) + await bot.long_reply(message, list(results), label="abuse check") diff --git a/plugins/alert.py b/plugins/alert.py index 958cc4a..232b86c 100644 --- a/plugins/alert.py +++ b/plugins/alert.py @@ -2012,6 +2012,7 @@ async def cmd_alert(bot, message): return loop = asyncio.get_running_loop() fp = bot.registry._modules.get("flaskpaste") + history_lines = [] for row_id, backend, title, url, date, found_at, short_id, short_url in reversed(rows): ts = found_at[:10] title = _truncate(title) if title else "(no title)" @@ -2033,7 +2034,8 @@ async def cmd_alert(bot, message): line = f"[{name}/{backend}/{short_id}] ({date or ts}) {title}" if display_url: line += f" -- {display_url}" - await bot.reply(message, line) + history_lines.append(line) + await bot.long_reply(message, history_lines, label="history") return # -- info (any user, channel only) --------------------------------------- diff --git a/plugins/crtsh.py b/plugins/crtsh.py index 6fe8082..0c11f36 100644 --- a/plugins/crtsh.py +++ b/plugins/crtsh.py @@ -182,6 +182,4 @@ async def cmd_cert(bot, message): await bot.reply(message, f"Querying crt.sh for {len(domains)} domain(s)...") results = await asyncio.gather(*[analyze_domain(d) for d in domains]) - - for line in results: - await bot.reply(message, line) + await bot.long_reply(message, list(results), label="certs") diff --git a/plugins/dork.py b/plugins/dork.py index bc6831a..b4b61e5 100644 --- a/plugins/dork.py +++ b/plugins/dork.py @@ -67,8 +67,9 @@ async def cmd_dork(bot, message): subcmd = parts[1].lower() if subcmd == "list": - lines = [f" {k:<10} {desc}" for k, (_, desc) in sorted(_DORKS.items())] - await bot.reply(message, "Dork categories:\n" + "\n".join(lines)) + lines = ["Dork categories:"] + lines.extend(f" {k:<10} {desc}" for k, (_, desc) in sorted(_DORKS.items())) + await bot.long_reply(message, lines, label="dork categories") return if len(parts) < 3: diff --git a/plugins/exploitdb.py b/plugins/exploitdb.py index bda5a63..169d142 100644 --- a/plugins/exploitdb.py +++ b/plugins/exploitdb.py @@ -158,10 +158,10 @@ async def cmd_exploitdb(bot, message): if not matches: await bot.reply(message, f"No exploits matching '{term}'") return - for entry in matches[:_MAX_RESULTS]: - await bot.reply(message, _format_entry(entry)) + lines = [_format_entry(e) for e in matches[:_MAX_RESULTS]] if len(matches) > _MAX_RESULTS: - await bot.reply(message, f"({len(matches)} total, showing {_MAX_RESULTS})") + lines.append(f"({len(matches)} total, showing {_MAX_RESULTS})") + await bot.long_reply(message, lines, label="exploits") return if sub.lower() == "cve": @@ -177,10 +177,10 @@ async def cmd_exploitdb(bot, message): if not matches: await bot.reply(message, f"No exploits for {cve_id}") return - for entry in matches[:_MAX_RESULTS]: - await bot.reply(message, _format_entry(entry)) + lines = [_format_entry(e) for e in matches[:_MAX_RESULTS]] if len(matches) > _MAX_RESULTS: - await bot.reply(message, f"({len(matches)} total, showing {_MAX_RESULTS})") + lines.append(f"({len(matches)} total, showing {_MAX_RESULTS})") + await bot.long_reply(message, lines, label="exploits") return # Direct ID lookup @@ -209,7 +209,7 @@ async def cmd_exploitdb(bot, message): if not matches: await bot.reply(message, f"No exploits matching '{term}'") return - for entry in matches[:_MAX_RESULTS]: - await bot.reply(message, _format_entry(entry)) + lines = [_format_entry(e) for e in matches[:_MAX_RESULTS]] if len(matches) > _MAX_RESULTS: - await bot.reply(message, f"({len(matches)} total, showing {_MAX_RESULTS})") + lines.append(f"({len(matches)} total, showing {_MAX_RESULTS})") + await bot.long_reply(message, lines, label="exploits") diff --git a/plugins/subdomain.py b/plugins/subdomain.py index 11ac5d2..9ec5e61 100644 --- a/plugins/subdomain.py +++ b/plugins/subdomain.py @@ -153,8 +153,7 @@ async def cmd_subdomain(bot, message): total = len(sorted_subs) shown = sorted_subs[:_MAX_RESULTS] - for fqdn, ips in shown: - await bot.reply(message, f" {fqdn} -> {', '.join(ips)}") - + lines = [f" {fqdn} -> {', '.join(ips)}" for fqdn, ips in shown] suffix = f" ({total - _MAX_RESULTS} more)" if total > _MAX_RESULTS else "" - await bot.reply(message, f"{domain}: {total} subdomains found{suffix}") + lines.append(f"{domain}: {total} subdomains found{suffix}") + await bot.long_reply(message, lines, label="subdomains") diff --git a/src/derp/bot.py b/src/derp/bot.py index dc321ce..4d4b175 100644 --- a/src/derp/bot.py +++ b/src/derp/bot.py @@ -460,6 +460,52 @@ class Bot: if target: await self.send(target, text) + async def long_reply( + self, msg: Message, lines: list[str], *, + label: str = "", + ) -> None: + """Reply with a list of lines; paste overflow to FlaskPaste. + + If len(lines) <= paste_threshold, sends each line via send(). + If len(lines) > paste_threshold, creates a paste with all lines + and sends a preview (first 2 lines) + paste URL. + Falls back to sending all lines if FlaskPaste is unavailable. + """ + threshold = self.config["bot"].get("paste_threshold", 4) + target = msg.target if msg.is_channel else msg.nick + + if not lines or not target: + return + + if len(lines) <= threshold: + for line in lines: + await self.send(target, line) + return + + # Attempt paste overflow + fp = self.registry._modules.get("flaskpaste") + paste_url = None + if fp: + full_text = "\n".join(lines) + loop = asyncio.get_running_loop() + paste_url = await loop.run_in_executor( + None, fp.create_paste, self, full_text, + ) + + if paste_url: + preview_count = min(2, threshold - 1) + for line in lines[:preview_count]: + await self.send(target, line) + remaining = len(lines) - preview_count + suffix = f" ({label})" if label else "" + await self.send( + target, + f"... {remaining} more lines{suffix}: {paste_url}", + ) + else: + for line in lines: + await self.send(target, line) + async def action(self, target: str, text: str) -> None: """Send a CTCP ACTION (/me) to a target.""" await self.send(target, f"\x01ACTION {text}\x01") diff --git a/src/derp/config.py b/src/derp/config.py index d302fb2..565c908 100644 --- a/src/derp/config.py +++ b/src/derp/config.py @@ -27,6 +27,7 @@ DEFAULTS: dict = { "plugins_dir": "plugins", "rate_limit": 2.0, "rate_burst": 5, + "paste_threshold": 4, "admins": [], }, "channels": {}, diff --git a/tests/test_paste_overflow.py b/tests/test_paste_overflow.py new file mode 100644 index 0000000..b24617b --- /dev/null +++ b/tests/test_paste_overflow.py @@ -0,0 +1,175 @@ +"""Tests for Bot.long_reply() paste overflow behaviour.""" + +import asyncio +import types + +from derp.bot import Bot +from derp.irc import Message +from derp.plugin import PluginRegistry + +# -- Helpers ----------------------------------------------------------------- + +def _make_bot(*, paste_threshold: int = 4, flaskpaste_mod=None) -> Bot: + """Build a Bot with minimal config and a captured send log.""" + config = { + "server": { + "host": "localhost", "port": 6667, "tls": False, + "nick": "testbot", "user": "testbot", "realname": "test", + }, + "bot": { + "prefix": "!", + "channels": ["#test"], + "plugins_dir": "plugins", + "rate_limit": 100.0, + "rate_burst": 100, + "paste_threshold": paste_threshold, + "admins": [], + }, + } + registry = PluginRegistry() + if flaskpaste_mod is not None: + registry._modules["flaskpaste"] = flaskpaste_mod + bot = Bot(config, registry) + bot._sent: list[tuple[str, str]] = [] # type: ignore[attr-defined] + + async def _capturing_send(target: str, text: str) -> None: + bot._sent.append((target, text)) + + bot.send = _capturing_send # type: ignore[assignment] + return bot + + +def _msg(text: str = "", target: str = "#test", nick: str = "alice") -> Message: + """Create a channel PRIVMSG.""" + return Message( + raw="", prefix=f"{nick}!~{nick}@host", nick=nick, + command="PRIVMSG", params=[target, text], tags={}, + ) + + +def _pm(text: str = "", nick: str = "alice") -> Message: + """Create a private PRIVMSG (target = bot nick).""" + return Message( + raw="", prefix=f"{nick}!~{nick}@host", nick=nick, + command="PRIVMSG", params=["testbot", text], tags={}, + ) + + +def _make_fp_mod(*, paste_url: str | None = "https://paste.example/abc"): + """Build a fake flaskpaste module with create_paste().""" + mod = types.ModuleType("flaskpaste") + mod.create_paste = lambda bot, content: paste_url # type: ignore[attr-defined] + return mod + + +# -- Tests ------------------------------------------------------------------- + +class TestShortReply: + def test_sends_all(self): + """Lines <= threshold are sent individually, no paste.""" + bot = _make_bot(paste_threshold=4) + msg = _msg() + lines = ["line 1", "line 2", "line 3"] + asyncio.run(bot.long_reply(msg, lines)) + assert len(bot._sent) == 3 + assert bot._sent[0] == ("#test", "line 1") + assert bot._sent[1] == ("#test", "line 2") + assert bot._sent[2] == ("#test", "line 3") + + +class TestLongReply: + def test_creates_paste(self): + """Lines > threshold creates paste, sends preview + URL.""" + fp = _make_fp_mod(paste_url="https://paste.example/xyz") + bot = _make_bot(paste_threshold=3, flaskpaste_mod=fp) + msg = _msg() + lines = ["line 1", "line 2", "line 3", "line 4", "line 5"] + asyncio.run(bot.long_reply(msg, lines, label="results")) + # preview_count = min(2, threshold-1) = min(2, 2) = 2 + assert len(bot._sent) == 3 + assert bot._sent[0] == ("#test", "line 1") + assert bot._sent[1] == ("#test", "line 2") + assert "3 more lines" in bot._sent[2][1] + assert "(results)" in bot._sent[2][1] + assert "https://paste.example/xyz" in bot._sent[2][1] + + def test_fallback_no_flaskpaste(self): + """No flaskpaste module loaded -- falls back to sending all lines.""" + bot = _make_bot(paste_threshold=2) + msg = _msg() + lines = ["a", "b", "c", "d"] + asyncio.run(bot.long_reply(msg, lines)) + assert len(bot._sent) == 4 + assert [t for _, t in bot._sent] == ["a", "b", "c", "d"] + + def test_fallback_paste_fails(self): + """create_paste returns None -- falls back to sending all lines.""" + fp = _make_fp_mod(paste_url=None) + bot = _make_bot(paste_threshold=2, flaskpaste_mod=fp) + msg = _msg() + lines = ["a", "b", "c"] + asyncio.run(bot.long_reply(msg, lines)) + assert len(bot._sent) == 3 + assert [t for _, t in bot._sent] == ["a", "b", "c"] + + def test_label_in_overflow_message(self): + """Label appears in the overflow message.""" + fp = _make_fp_mod() + bot = _make_bot(paste_threshold=2, flaskpaste_mod=fp) + msg = _msg() + lines = ["a", "b", "c"] + asyncio.run(bot.long_reply(msg, lines, label="history")) + overflow = bot._sent[-1][1] + assert "(history)" in overflow + + def test_no_label(self): + """Overflow message omits label suffix when label is empty.""" + fp = _make_fp_mod() + bot = _make_bot(paste_threshold=2, flaskpaste_mod=fp) + msg = _msg() + lines = ["a", "b", "c"] + asyncio.run(bot.long_reply(msg, lines)) + overflow = bot._sent[-1][1] + assert "more lines:" in overflow + assert "()" not in overflow + + +class TestThreshold: + def test_configurable(self): + """Custom threshold from config controls overflow point.""" + fp = _make_fp_mod() + bot = _make_bot(paste_threshold=10, flaskpaste_mod=fp) + msg = _msg() + + # 10 lines == threshold -> no paste + lines_at = [f"line {i}" for i in range(10)] + asyncio.run(bot.long_reply(msg, lines_at)) + assert len(bot._sent) == 10 + + def test_over_threshold_pastes(self): + """Lines exceeding threshold triggers paste.""" + fp = _make_fp_mod() + bot = _make_bot(paste_threshold=10, flaskpaste_mod=fp) + msg = _msg() + lines_over = [f"line {i}" for i in range(11)] + asyncio.run(bot.long_reply(msg, lines_over)) + assert len(bot._sent) == 3 # 2 preview + overflow msg + + +class TestEdgeCases: + def test_empty_lines_noop(self): + """Empty list produces no output.""" + bot = _make_bot() + msg = _msg() + asyncio.run(bot.long_reply(msg, [])) + assert bot._sent == [] + + def test_pm_uses_nick(self): + """Private messages use nick as target.""" + bot = _make_bot(paste_threshold=4) + msg = _pm() + lines = ["x", "y"] + asyncio.run(bot.long_reply(msg, lines)) + assert len(bot._sent) == 2 + assert bot._sent[0] == ("alice", "x") + assert bot._sent[1] == ("alice", "y")