Files
derp/tests/test_canary.py
user e3bb793574 feat: add canary, tcping, archive, resolve plugins
canary: generate realistic fake credentials (token/aws/basic) for
planting as canary tripwires. Per-channel state persistence.

tcping: TCP connect latency probe through SOCKS5 proxy with
min/avg/max reporting. Proxy-compatible alternative to traceroute.

archive: save URLs to Wayback Machine via Save Page Now API,
routed through SOCKS5 proxy.

resolve: bulk DNS resolution (up to 10 hosts) via TCP DNS through
SOCKS5 proxy with concurrent asyncio.gather.

83 new tests (1010 total), docs updated.
2026-02-20 19:38:10 +01:00

303 lines
9.5 KiB
Python

"""Tests for the canary token generator plugin."""
import asyncio
import importlib.util
import sys
from pathlib import Path
from derp.irc import Message
# plugins/ is not a Python package -- load the module from file path
_spec = importlib.util.spec_from_file_location(
"plugins.canary", Path(__file__).resolve().parent.parent / "plugins" / "canary.py",
)
_mod = importlib.util.module_from_spec(_spec)
sys.modules[_spec.name] = _mod
_spec.loader.exec_module(_mod)
from plugins.canary import ( # noqa: E402
_gen_aws,
_gen_basic,
_gen_token,
_load,
_save,
cmd_canary,
)
# -- Helpers -----------------------------------------------------------------
class _FakeState:
"""In-memory stand-in for bot.state."""
def __init__(self):
self._store: dict[str, dict[str, str]] = {}
def get(self, plugin: str, key: str, default: str | None = None) -> str | None:
return self._store.get(plugin, {}).get(key, default)
def set(self, plugin: str, key: str, value: str) -> None:
self._store.setdefault(plugin, {})[key] = value
def delete(self, plugin: str, key: str) -> bool:
try:
del self._store[plugin][key]
return True
except KeyError:
return False
def keys(self, plugin: str) -> list[str]:
return sorted(self._store.get(plugin, {}).keys())
class _FakeBot:
"""Minimal bot stand-in that captures sent/replied messages."""
def __init__(self, *, admin: bool = False):
self.sent: list[tuple[str, str]] = []
self.replied: list[str] = []
self.state = _FakeState()
self._admin = admin
async def send(self, target: str, text: str) -> None:
self.sent.append((target, text))
async def reply(self, message, text: str) -> None:
self.replied.append(text)
def _is_admin(self, message) -> bool:
return self._admin
def _msg(text: str, nick: str = "alice", target: str = "#ops") -> Message:
"""Create a channel PRIVMSG."""
return Message(
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
command="PRIVMSG", params=[target, text], tags={},
)
def _pm(text: str, nick: str = "alice") -> Message:
"""Create a private PRIVMSG."""
return Message(
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
command="PRIVMSG", params=["botname", text], tags={},
)
# -- Token generators -------------------------------------------------------
class TestGenToken:
def test_length(self):
tok = _gen_token()
assert len(tok) == 40
def test_hex(self):
tok = _gen_token()
int(tok, 16) # Should not raise
def test_unique(self):
assert _gen_token() != _gen_token()
class TestGenAws:
def test_access_key_format(self):
pair = _gen_aws()
assert pair["access_key"].startswith("AKIA")
assert len(pair["access_key"]) == 20
def test_secret_key_present(self):
pair = _gen_aws()
assert len(pair["secret_key"]) > 20
class TestGenBasic:
def test_user_format(self):
pair = _gen_basic()
assert pair["user"].startswith("svc")
assert len(pair["user"]) == 8
def test_pass_present(self):
pair = _gen_basic()
assert len(pair["pass"]) > 10
# -- State helpers -----------------------------------------------------------
class TestStateHelpers:
def test_save_and_load(self):
bot = _FakeBot()
store = {"mykey": {"type": "token", "value": "abc", "created": "now"}}
_save(bot, "#ops", store)
loaded = _load(bot, "#ops")
assert loaded == store
def test_load_empty(self):
bot = _FakeBot()
assert _load(bot, "#ops") == {}
def test_load_bad_json(self):
bot = _FakeBot()
bot.state.set("canary", "#ops", "not json{{{")
assert _load(bot, "#ops") == {}
# -- Command: gen ------------------------------------------------------------
class TestCmdGen:
def test_gen_default_token(self):
bot = _FakeBot(admin=True)
asyncio.run(cmd_canary(bot, _msg("!canary gen db-cred")))
assert len(bot.replied) == 1
assert "db-cred" in bot.replied[0]
assert "token" in bot.replied[0]
store = _load(bot, "#ops")
assert "db-cred" in store
assert store["db-cred"]["type"] == "token"
assert len(store["db-cred"]["value"]) == 40
def test_gen_aws(self):
bot = _FakeBot(admin=True)
asyncio.run(cmd_canary(bot, _msg("!canary gen aws staging-key")))
assert "staging-key" in bot.replied[0]
assert "AKIA" in bot.replied[0]
store = _load(bot, "#ops")
assert store["staging-key"]["type"] == "aws"
def test_gen_basic(self):
bot = _FakeBot(admin=True)
asyncio.run(cmd_canary(bot, _msg("!canary gen basic svc-login")))
assert "svc-login" in bot.replied[0]
store = _load(bot, "#ops")
assert store["svc-login"]["type"] == "basic"
assert "user" in store["svc-login"]["value"]
def test_gen_requires_admin(self):
bot = _FakeBot(admin=False)
asyncio.run(cmd_canary(bot, _msg("!canary gen mytoken")))
assert "Permission denied" in bot.replied[0]
def test_gen_requires_channel(self):
bot = _FakeBot(admin=True)
asyncio.run(cmd_canary(bot, _pm("!canary gen mytoken")))
assert "channel" in bot.replied[0].lower()
def test_gen_duplicate(self):
bot = _FakeBot(admin=True)
asyncio.run(cmd_canary(bot, _msg("!canary gen dup-test")))
bot.replied.clear()
asyncio.run(cmd_canary(bot, _msg("!canary gen dup-test")))
assert "already exists" in bot.replied[0]
def test_gen_no_label(self):
bot = _FakeBot(admin=True)
asyncio.run(cmd_canary(bot, _msg("!canary gen")))
assert "Usage" in bot.replied[0]
def test_gen_type_no_label(self):
bot = _FakeBot(admin=True)
asyncio.run(cmd_canary(bot, _msg("!canary gen aws")))
assert "Usage" in bot.replied[0]
def test_gen_invalid_label(self):
bot = _FakeBot(admin=True)
asyncio.run(cmd_canary(bot, _msg("!canary gen b@d!")))
assert "Label" in bot.replied[0]
# -- Command: list -----------------------------------------------------------
class TestCmdList:
def test_list_empty(self):
bot = _FakeBot()
asyncio.run(cmd_canary(bot, _msg("!canary list")))
assert "No canaries" in bot.replied[0]
def test_list_populated(self):
bot = _FakeBot()
store = {
"api-key": {"type": "token", "value": "abc", "created": "now"},
"db-cred": {"type": "basic", "value": {"user": "x", "pass": "y"}, "created": "now"},
}
_save(bot, "#ops", store)
asyncio.run(cmd_canary(bot, _msg("!canary list")))
assert "api-key" in bot.replied[0]
assert "db-cred" in bot.replied[0]
def test_list_requires_channel(self):
bot = _FakeBot()
asyncio.run(cmd_canary(bot, _pm("!canary list")))
assert "channel" in bot.replied[0].lower()
# -- Command: info -----------------------------------------------------------
class TestCmdInfo:
def test_info_exists(self):
bot = _FakeBot()
store = {"mykey": {"type": "token", "value": "a" * 40, "created": "2026-02-20T14:00:00"}}
_save(bot, "#ops", store)
asyncio.run(cmd_canary(bot, _msg("!canary info mykey")))
assert "mykey" in bot.replied[0]
assert "a" * 40 in bot.replied[0]
def test_info_missing(self):
bot = _FakeBot()
asyncio.run(cmd_canary(bot, _msg("!canary info nope")))
assert "No canary" in bot.replied[0]
def test_info_no_label(self):
bot = _FakeBot()
asyncio.run(cmd_canary(bot, _msg("!canary info")))
assert "Usage" in bot.replied[0]
def test_info_requires_channel(self):
bot = _FakeBot()
asyncio.run(cmd_canary(bot, _pm("!canary info mykey")))
assert "channel" in bot.replied[0].lower()
# -- Command: del ------------------------------------------------------------
class TestCmdDel:
def test_del_success(self):
bot = _FakeBot(admin=True)
store = {"victim": {"type": "token", "value": "x", "created": "now"}}
_save(bot, "#ops", store)
asyncio.run(cmd_canary(bot, _msg("!canary del victim")))
assert "Deleted" in bot.replied[0]
assert _load(bot, "#ops") == {}
def test_del_nonexistent(self):
bot = _FakeBot(admin=True)
asyncio.run(cmd_canary(bot, _msg("!canary del nope")))
assert "No canary" in bot.replied[0]
def test_del_requires_admin(self):
bot = _FakeBot(admin=False)
asyncio.run(cmd_canary(bot, _msg("!canary del something")))
assert "Permission denied" in bot.replied[0]
def test_del_requires_channel(self):
bot = _FakeBot(admin=True)
asyncio.run(cmd_canary(bot, _pm("!canary del something")))
assert "channel" in bot.replied[0].lower()
def test_del_no_label(self):
bot = _FakeBot(admin=True)
asyncio.run(cmd_canary(bot, _msg("!canary del")))
assert "Usage" in bot.replied[0]
# -- Command: usage ----------------------------------------------------------
class TestCmdUsage:
def test_no_args(self):
bot = _FakeBot()
asyncio.run(cmd_canary(bot, _msg("!canary")))
assert "Usage" in bot.replied[0]
def test_unknown_subcommand(self):
bot = _FakeBot()
asyncio.run(cmd_canary(bot, _msg("!canary foobar")))
assert "Usage" in bot.replied[0]