canary: generate realistic fake credentials (token/aws/basic) for planting as canary tripwires. Per-channel state persistence. tcping: TCP connect latency probe through SOCKS5 proxy with min/avg/max reporting. Proxy-compatible alternative to traceroute. archive: save URLs to Wayback Machine via Save Page Now API, routed through SOCKS5 proxy. resolve: bulk DNS resolution (up to 10 hosts) via TCP DNS through SOCKS5 proxy with concurrent asyncio.gather. 83 new tests (1010 total), docs updated.
204 lines
6.1 KiB
Python
204 lines
6.1 KiB
Python
"""Tests for the TCP ping plugin."""
|
|
|
|
import asyncio
|
|
import importlib.util
|
|
import sys
|
|
from pathlib import Path
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
from derp.irc import Message
|
|
|
|
# plugins/ is not a Python package -- load the module from file path
|
|
_spec = importlib.util.spec_from_file_location(
|
|
"plugins.tcping", Path(__file__).resolve().parent.parent / "plugins" / "tcping.py",
|
|
)
|
|
_mod = importlib.util.module_from_spec(_spec)
|
|
sys.modules[_spec.name] = _mod
|
|
_spec.loader.exec_module(_mod)
|
|
|
|
from plugins.tcping import ( # noqa: E402
|
|
_is_internal,
|
|
_probe,
|
|
_validate_host,
|
|
cmd_tcping,
|
|
)
|
|
|
|
# -- Helpers -----------------------------------------------------------------
|
|
|
|
class _FakeBot:
|
|
def __init__(self):
|
|
self.replied: list[str] = []
|
|
|
|
async def reply(self, message, text: str) -> None:
|
|
self.replied.append(text)
|
|
|
|
|
|
def _msg(text: str) -> Message:
|
|
return Message(
|
|
raw="", prefix="alice!~alice@host", nick="alice",
|
|
command="PRIVMSG", params=["#test", text], tags={},
|
|
)
|
|
|
|
|
|
# -- Validation --------------------------------------------------------------
|
|
|
|
class TestValidateHost:
|
|
def test_valid_ip(self):
|
|
assert _validate_host("93.184.216.34") is True
|
|
|
|
def test_valid_domain(self):
|
|
assert _validate_host("example.com") is True
|
|
|
|
def test_invalid_no_dot(self):
|
|
assert _validate_host("localhost") is False
|
|
|
|
def test_invalid_chars(self):
|
|
assert _validate_host("bad host!") is False
|
|
|
|
|
|
class TestIsInternal:
|
|
def test_private(self):
|
|
assert _is_internal("192.168.1.1") is True
|
|
|
|
def test_loopback(self):
|
|
assert _is_internal("127.0.0.1") is True
|
|
|
|
def test_public(self):
|
|
assert _is_internal("8.8.8.8") is False
|
|
|
|
def test_domain(self):
|
|
assert _is_internal("example.com") is False
|
|
|
|
|
|
# -- Probe -------------------------------------------------------------------
|
|
|
|
class TestProbe:
|
|
def test_success(self):
|
|
writer = MagicMock()
|
|
writer.wait_closed = AsyncMock()
|
|
mock_open = AsyncMock(return_value=(MagicMock(), writer))
|
|
|
|
with patch.object(_mod, "_open_connection", mock_open):
|
|
rtt = asyncio.run(_probe("example.com", 443, 5.0))
|
|
|
|
assert rtt is not None
|
|
assert rtt >= 0
|
|
writer.close.assert_called_once()
|
|
|
|
def test_timeout(self):
|
|
mock_open = AsyncMock(side_effect=asyncio.TimeoutError())
|
|
|
|
with patch.object(_mod, "_open_connection", mock_open):
|
|
rtt = asyncio.run(_probe("example.com", 443, 0.1))
|
|
|
|
assert rtt is None
|
|
|
|
def test_connection_error(self):
|
|
mock_open = AsyncMock(side_effect=OSError("refused"))
|
|
|
|
with patch.object(_mod, "_open_connection", mock_open):
|
|
rtt = asyncio.run(_probe("example.com", 443, 5.0))
|
|
|
|
assert rtt is None
|
|
|
|
|
|
# -- Command -----------------------------------------------------------------
|
|
|
|
class TestCmdTcping:
|
|
def test_no_args(self):
|
|
bot = _FakeBot()
|
|
asyncio.run(cmd_tcping(bot, _msg("!tcping")))
|
|
assert "Usage" in bot.replied[0]
|
|
|
|
def test_invalid_host(self):
|
|
bot = _FakeBot()
|
|
asyncio.run(cmd_tcping(bot, _msg("!tcping notahost")))
|
|
assert "Invalid host" in bot.replied[0]
|
|
|
|
def test_internal_host(self):
|
|
bot = _FakeBot()
|
|
asyncio.run(cmd_tcping(bot, _msg("!tcping 192.168.1.1")))
|
|
assert "internal" in bot.replied[0].lower()
|
|
|
|
def test_invalid_port(self):
|
|
bot = _FakeBot()
|
|
asyncio.run(cmd_tcping(bot, _msg("!tcping example.com 99999")))
|
|
assert "Invalid port" in bot.replied[0]
|
|
|
|
def test_invalid_port_string(self):
|
|
bot = _FakeBot()
|
|
asyncio.run(cmd_tcping(bot, _msg("!tcping example.com abc")))
|
|
assert "Invalid port" in bot.replied[0]
|
|
|
|
def test_success_default(self):
|
|
"""Default 3 probes, all succeed."""
|
|
bot = _FakeBot()
|
|
writer = MagicMock()
|
|
writer.wait_closed = AsyncMock()
|
|
mock_open = AsyncMock(return_value=(MagicMock(), writer))
|
|
|
|
with patch.object(_mod, "_open_connection", mock_open):
|
|
asyncio.run(cmd_tcping(bot, _msg("!tcping example.com")))
|
|
|
|
assert len(bot.replied) == 1
|
|
reply = bot.replied[0]
|
|
assert "example.com:443" in reply
|
|
assert "3 probes" in reply
|
|
assert "min/avg/max" in reply
|
|
|
|
def test_custom_port_and_count(self):
|
|
bot = _FakeBot()
|
|
writer = MagicMock()
|
|
writer.wait_closed = AsyncMock()
|
|
mock_open = AsyncMock(return_value=(MagicMock(), writer))
|
|
|
|
with patch.object(_mod, "_open_connection", mock_open):
|
|
asyncio.run(cmd_tcping(bot, _msg("!tcping example.com 22 5")))
|
|
|
|
reply = bot.replied[0]
|
|
assert "example.com:22" in reply
|
|
assert "5 probes" in reply
|
|
|
|
def test_all_timeout(self):
|
|
bot = _FakeBot()
|
|
mock_open = AsyncMock(side_effect=asyncio.TimeoutError())
|
|
|
|
with patch.object(_mod, "_open_connection", mock_open):
|
|
asyncio.run(cmd_tcping(bot, _msg("!tcping example.com")))
|
|
|
|
assert "timed out" in bot.replied[0]
|
|
|
|
def test_count_clamped(self):
|
|
"""Count > MAX_COUNT gets clamped."""
|
|
bot = _FakeBot()
|
|
writer = MagicMock()
|
|
writer.wait_closed = AsyncMock()
|
|
mock_open = AsyncMock(return_value=(MagicMock(), writer))
|
|
|
|
with patch.object(_mod, "_open_connection", mock_open):
|
|
asyncio.run(cmd_tcping(bot, _msg("!tcping example.com 443 99")))
|
|
|
|
# MAX_COUNT is 10
|
|
assert "10 probes" in bot.replied[0]
|
|
|
|
def test_partial_timeout(self):
|
|
"""Some probes succeed, some fail."""
|
|
bot = _FakeBot()
|
|
call_count = 0
|
|
writer = MagicMock()
|
|
writer.wait_closed = AsyncMock()
|
|
|
|
async def mock_open(host, port, timeout=None):
|
|
nonlocal call_count
|
|
call_count += 1
|
|
if call_count == 2:
|
|
raise asyncio.TimeoutError()
|
|
return (MagicMock(), writer)
|
|
|
|
with patch.object(_mod, "_open_connection", mock_open):
|
|
asyncio.run(cmd_tcping(bot, _msg("!tcping example.com 443 3")))
|
|
|
|
reply = bot.replied[0]
|
|
assert "timeout" in reply
|
|
assert "min/avg/max" in reply
|