canary: generate realistic fake credentials (token/aws/basic) for planting as canary tripwires. Per-channel state persistence. tcping: TCP connect latency probe through SOCKS5 proxy with min/avg/max reporting. Proxy-compatible alternative to traceroute. archive: save URLs to Wayback Machine via Save Page Now API, routed through SOCKS5 proxy. resolve: bulk DNS resolution (up to 10 hosts) via TCP DNS through SOCKS5 proxy with concurrent asyncio.gather. 83 new tests (1010 total), docs updated.
303 lines
9.5 KiB
Python
303 lines
9.5 KiB
Python
"""Tests for the canary token generator plugin."""
|
|
|
|
import asyncio
|
|
import importlib.util
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
from derp.irc import Message
|
|
|
|
# plugins/ is not a Python package -- load the module from file path
|
|
_spec = importlib.util.spec_from_file_location(
|
|
"plugins.canary", Path(__file__).resolve().parent.parent / "plugins" / "canary.py",
|
|
)
|
|
_mod = importlib.util.module_from_spec(_spec)
|
|
sys.modules[_spec.name] = _mod
|
|
_spec.loader.exec_module(_mod)
|
|
|
|
from plugins.canary import ( # noqa: E402
|
|
_gen_aws,
|
|
_gen_basic,
|
|
_gen_token,
|
|
_load,
|
|
_save,
|
|
cmd_canary,
|
|
)
|
|
|
|
# -- Helpers -----------------------------------------------------------------
|
|
|
|
class _FakeState:
|
|
"""In-memory stand-in for bot.state."""
|
|
|
|
def __init__(self):
|
|
self._store: dict[str, dict[str, str]] = {}
|
|
|
|
def get(self, plugin: str, key: str, default: str | None = None) -> str | None:
|
|
return self._store.get(plugin, {}).get(key, default)
|
|
|
|
def set(self, plugin: str, key: str, value: str) -> None:
|
|
self._store.setdefault(plugin, {})[key] = value
|
|
|
|
def delete(self, plugin: str, key: str) -> bool:
|
|
try:
|
|
del self._store[plugin][key]
|
|
return True
|
|
except KeyError:
|
|
return False
|
|
|
|
def keys(self, plugin: str) -> list[str]:
|
|
return sorted(self._store.get(plugin, {}).keys())
|
|
|
|
|
|
class _FakeBot:
|
|
"""Minimal bot stand-in that captures sent/replied messages."""
|
|
|
|
def __init__(self, *, admin: bool = False):
|
|
self.sent: list[tuple[str, str]] = []
|
|
self.replied: list[str] = []
|
|
self.state = _FakeState()
|
|
self._admin = admin
|
|
|
|
async def send(self, target: str, text: str) -> None:
|
|
self.sent.append((target, text))
|
|
|
|
async def reply(self, message, text: str) -> None:
|
|
self.replied.append(text)
|
|
|
|
def _is_admin(self, message) -> bool:
|
|
return self._admin
|
|
|
|
|
|
def _msg(text: str, nick: str = "alice", target: str = "#ops") -> Message:
|
|
"""Create a channel PRIVMSG."""
|
|
return Message(
|
|
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
|
|
command="PRIVMSG", params=[target, text], tags={},
|
|
)
|
|
|
|
|
|
def _pm(text: str, nick: str = "alice") -> Message:
|
|
"""Create a private PRIVMSG."""
|
|
return Message(
|
|
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
|
|
command="PRIVMSG", params=["botname", text], tags={},
|
|
)
|
|
|
|
|
|
# -- Token generators -------------------------------------------------------
|
|
|
|
class TestGenToken:
|
|
def test_length(self):
|
|
tok = _gen_token()
|
|
assert len(tok) == 40
|
|
|
|
def test_hex(self):
|
|
tok = _gen_token()
|
|
int(tok, 16) # Should not raise
|
|
|
|
def test_unique(self):
|
|
assert _gen_token() != _gen_token()
|
|
|
|
|
|
class TestGenAws:
|
|
def test_access_key_format(self):
|
|
pair = _gen_aws()
|
|
assert pair["access_key"].startswith("AKIA")
|
|
assert len(pair["access_key"]) == 20
|
|
|
|
def test_secret_key_present(self):
|
|
pair = _gen_aws()
|
|
assert len(pair["secret_key"]) > 20
|
|
|
|
|
|
class TestGenBasic:
|
|
def test_user_format(self):
|
|
pair = _gen_basic()
|
|
assert pair["user"].startswith("svc")
|
|
assert len(pair["user"]) == 8
|
|
|
|
def test_pass_present(self):
|
|
pair = _gen_basic()
|
|
assert len(pair["pass"]) > 10
|
|
|
|
|
|
# -- State helpers -----------------------------------------------------------
|
|
|
|
class TestStateHelpers:
|
|
def test_save_and_load(self):
|
|
bot = _FakeBot()
|
|
store = {"mykey": {"type": "token", "value": "abc", "created": "now"}}
|
|
_save(bot, "#ops", store)
|
|
loaded = _load(bot, "#ops")
|
|
assert loaded == store
|
|
|
|
def test_load_empty(self):
|
|
bot = _FakeBot()
|
|
assert _load(bot, "#ops") == {}
|
|
|
|
def test_load_bad_json(self):
|
|
bot = _FakeBot()
|
|
bot.state.set("canary", "#ops", "not json{{{")
|
|
assert _load(bot, "#ops") == {}
|
|
|
|
|
|
# -- Command: gen ------------------------------------------------------------
|
|
|
|
class TestCmdGen:
|
|
def test_gen_default_token(self):
|
|
bot = _FakeBot(admin=True)
|
|
asyncio.run(cmd_canary(bot, _msg("!canary gen db-cred")))
|
|
assert len(bot.replied) == 1
|
|
assert "db-cred" in bot.replied[0]
|
|
assert "token" in bot.replied[0]
|
|
store = _load(bot, "#ops")
|
|
assert "db-cred" in store
|
|
assert store["db-cred"]["type"] == "token"
|
|
assert len(store["db-cred"]["value"]) == 40
|
|
|
|
def test_gen_aws(self):
|
|
bot = _FakeBot(admin=True)
|
|
asyncio.run(cmd_canary(bot, _msg("!canary gen aws staging-key")))
|
|
assert "staging-key" in bot.replied[0]
|
|
assert "AKIA" in bot.replied[0]
|
|
store = _load(bot, "#ops")
|
|
assert store["staging-key"]["type"] == "aws"
|
|
|
|
def test_gen_basic(self):
|
|
bot = _FakeBot(admin=True)
|
|
asyncio.run(cmd_canary(bot, _msg("!canary gen basic svc-login")))
|
|
assert "svc-login" in bot.replied[0]
|
|
store = _load(bot, "#ops")
|
|
assert store["svc-login"]["type"] == "basic"
|
|
assert "user" in store["svc-login"]["value"]
|
|
|
|
def test_gen_requires_admin(self):
|
|
bot = _FakeBot(admin=False)
|
|
asyncio.run(cmd_canary(bot, _msg("!canary gen mytoken")))
|
|
assert "Permission denied" in bot.replied[0]
|
|
|
|
def test_gen_requires_channel(self):
|
|
bot = _FakeBot(admin=True)
|
|
asyncio.run(cmd_canary(bot, _pm("!canary gen mytoken")))
|
|
assert "channel" in bot.replied[0].lower()
|
|
|
|
def test_gen_duplicate(self):
|
|
bot = _FakeBot(admin=True)
|
|
asyncio.run(cmd_canary(bot, _msg("!canary gen dup-test")))
|
|
bot.replied.clear()
|
|
asyncio.run(cmd_canary(bot, _msg("!canary gen dup-test")))
|
|
assert "already exists" in bot.replied[0]
|
|
|
|
def test_gen_no_label(self):
|
|
bot = _FakeBot(admin=True)
|
|
asyncio.run(cmd_canary(bot, _msg("!canary gen")))
|
|
assert "Usage" in bot.replied[0]
|
|
|
|
def test_gen_type_no_label(self):
|
|
bot = _FakeBot(admin=True)
|
|
asyncio.run(cmd_canary(bot, _msg("!canary gen aws")))
|
|
assert "Usage" in bot.replied[0]
|
|
|
|
def test_gen_invalid_label(self):
|
|
bot = _FakeBot(admin=True)
|
|
asyncio.run(cmd_canary(bot, _msg("!canary gen b@d!")))
|
|
assert "Label" in bot.replied[0]
|
|
|
|
|
|
# -- Command: list -----------------------------------------------------------
|
|
|
|
class TestCmdList:
|
|
def test_list_empty(self):
|
|
bot = _FakeBot()
|
|
asyncio.run(cmd_canary(bot, _msg("!canary list")))
|
|
assert "No canaries" in bot.replied[0]
|
|
|
|
def test_list_populated(self):
|
|
bot = _FakeBot()
|
|
store = {
|
|
"api-key": {"type": "token", "value": "abc", "created": "now"},
|
|
"db-cred": {"type": "basic", "value": {"user": "x", "pass": "y"}, "created": "now"},
|
|
}
|
|
_save(bot, "#ops", store)
|
|
asyncio.run(cmd_canary(bot, _msg("!canary list")))
|
|
assert "api-key" in bot.replied[0]
|
|
assert "db-cred" in bot.replied[0]
|
|
|
|
def test_list_requires_channel(self):
|
|
bot = _FakeBot()
|
|
asyncio.run(cmd_canary(bot, _pm("!canary list")))
|
|
assert "channel" in bot.replied[0].lower()
|
|
|
|
|
|
# -- Command: info -----------------------------------------------------------
|
|
|
|
class TestCmdInfo:
|
|
def test_info_exists(self):
|
|
bot = _FakeBot()
|
|
store = {"mykey": {"type": "token", "value": "a" * 40, "created": "2026-02-20T14:00:00"}}
|
|
_save(bot, "#ops", store)
|
|
asyncio.run(cmd_canary(bot, _msg("!canary info mykey")))
|
|
assert "mykey" in bot.replied[0]
|
|
assert "a" * 40 in bot.replied[0]
|
|
|
|
def test_info_missing(self):
|
|
bot = _FakeBot()
|
|
asyncio.run(cmd_canary(bot, _msg("!canary info nope")))
|
|
assert "No canary" in bot.replied[0]
|
|
|
|
def test_info_no_label(self):
|
|
bot = _FakeBot()
|
|
asyncio.run(cmd_canary(bot, _msg("!canary info")))
|
|
assert "Usage" in bot.replied[0]
|
|
|
|
def test_info_requires_channel(self):
|
|
bot = _FakeBot()
|
|
asyncio.run(cmd_canary(bot, _pm("!canary info mykey")))
|
|
assert "channel" in bot.replied[0].lower()
|
|
|
|
|
|
# -- Command: del ------------------------------------------------------------
|
|
|
|
class TestCmdDel:
|
|
def test_del_success(self):
|
|
bot = _FakeBot(admin=True)
|
|
store = {"victim": {"type": "token", "value": "x", "created": "now"}}
|
|
_save(bot, "#ops", store)
|
|
asyncio.run(cmd_canary(bot, _msg("!canary del victim")))
|
|
assert "Deleted" in bot.replied[0]
|
|
assert _load(bot, "#ops") == {}
|
|
|
|
def test_del_nonexistent(self):
|
|
bot = _FakeBot(admin=True)
|
|
asyncio.run(cmd_canary(bot, _msg("!canary del nope")))
|
|
assert "No canary" in bot.replied[0]
|
|
|
|
def test_del_requires_admin(self):
|
|
bot = _FakeBot(admin=False)
|
|
asyncio.run(cmd_canary(bot, _msg("!canary del something")))
|
|
assert "Permission denied" in bot.replied[0]
|
|
|
|
def test_del_requires_channel(self):
|
|
bot = _FakeBot(admin=True)
|
|
asyncio.run(cmd_canary(bot, _pm("!canary del something")))
|
|
assert "channel" in bot.replied[0].lower()
|
|
|
|
def test_del_no_label(self):
|
|
bot = _FakeBot(admin=True)
|
|
asyncio.run(cmd_canary(bot, _msg("!canary del")))
|
|
assert "Usage" in bot.replied[0]
|
|
|
|
|
|
# -- Command: usage ----------------------------------------------------------
|
|
|
|
class TestCmdUsage:
|
|
def test_no_args(self):
|
|
bot = _FakeBot()
|
|
asyncio.run(cmd_canary(bot, _msg("!canary")))
|
|
assert "Usage" in bot.replied[0]
|
|
|
|
def test_unknown_subcommand(self):
|
|
bot = _FakeBot()
|
|
asyncio.run(cmd_canary(bot, _msg("!canary foobar")))
|
|
assert "Usage" in bot.replied[0]
|