Files
derp/tests/test_internetdb.py
user 3de3f054df feat: add internetdb plugin (Shodan InternetDB host recon)
Free, keyless API returning open ports, hostnames, CPEs, tags, and
known CVEs for any public IP. All requests routed through SOCKS5.
21 test cases (927 total).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-20 17:41:51 +01:00

282 lines
8.6 KiB
Python

"""Tests for the InternetDB plugin (Shodan free host recon)."""
import asyncio
import importlib.util
import json
import sys
import urllib.error
from pathlib import Path
from unittest.mock import patch
from derp.irc import Message
# plugins/ is not a Python package -- load the module from file path
_spec = importlib.util.spec_from_file_location(
"plugins.internetdb",
Path(__file__).resolve().parent.parent / "plugins" / "internetdb.py",
)
_mod = importlib.util.module_from_spec(_spec)
sys.modules[_spec.name] = _mod
_spec.loader.exec_module(_mod)
from plugins.internetdb import ( # noqa: E402
_fetch,
_format_result,
cmd_internetdb,
)
# -- Sample API responses ----------------------------------------------------
SAMPLE_FULL = {
"cpes": ["cpe:/a:apache:http_server:2.4.41", "cpe:/a:openssl:openssl:1.1.1"],
"hostnames": ["dns.google"],
"ip": "8.8.8.8",
"ports": [443, 53],
"tags": ["cloud"],
"vulns": ["CVE-2021-23017", "CVE-2021-3449"],
}
SAMPLE_MINIMAL = {
"cpes": [],
"hostnames": [],
"ip": "203.0.113.1",
"ports": [80],
"tags": [],
"vulns": [],
}
SAMPLE_EMPTY = {
"cpes": [],
"hostnames": [],
"ip": "198.51.100.1",
"ports": [],
"tags": [],
"vulns": [],
}
SAMPLE_MANY_VULNS = {
"cpes": [],
"hostnames": ["vuln.example.com"],
"ip": "192.0.2.1",
"ports": [22, 80, 443],
"tags": ["self-signed", "eol-os"],
"vulns": [f"CVE-2021-{i}" for i in range(15)],
}
SAMPLE_MANY_HOSTNAMES = {
"cpes": [],
"hostnames": [f"host{i}.example.com" for i in range(8)],
"ip": "192.0.2.2",
"ports": [80],
"tags": [],
"vulns": [],
}
# -- Helpers -----------------------------------------------------------------
class _FakeBot:
def __init__(self):
self.replied: list[str] = []
async def reply(self, message, text: str) -> None:
self.replied.append(text)
def _msg(text: str, nick: str = "alice", target: str = "#test") -> Message:
return Message(
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
command="PRIVMSG", params=[target, text], tags={},
)
class _FakeResp:
"""Minimal file-like response for urlopen mocking."""
def __init__(self, data: bytes, status: int = 200):
self._data = data
self.status = status
def read(self):
return self._data
def __enter__(self):
return self
def __exit__(self, *a):
pass
# ---------------------------------------------------------------------------
# TestFetch
# ---------------------------------------------------------------------------
class TestFetch:
def test_success(self):
body = json.dumps(SAMPLE_FULL).encode()
with patch.object(_mod, "_urlopen", return_value=_FakeResp(body)):
result = _fetch("8.8.8.8")
assert result == SAMPLE_FULL
def test_not_found(self):
exc = urllib.error.HTTPError(
"https://internetdb.shodan.io/192.0.2.1", 404, "Not Found", {}, None,
)
with patch.object(_mod, "_urlopen", side_effect=exc):
result = _fetch("192.0.2.1")
assert result is None
def test_server_error_raises(self):
exc = urllib.error.HTTPError(
"https://internetdb.shodan.io/192.0.2.1", 500, "Server Error", {}, None,
)
with patch.object(_mod, "_urlopen", side_effect=exc):
try:
_fetch("192.0.2.1")
assert False, "Expected HTTPError"
except urllib.error.HTTPError as e:
assert e.code == 500
def test_builds_correct_url(self):
calls = []
def _mock_urlopen(url, **kw):
calls.append(url)
return _FakeResp(json.dumps(SAMPLE_MINIMAL).encode())
with patch.object(_mod, "_urlopen", side_effect=_mock_urlopen):
_fetch("203.0.113.1")
assert calls[0] == "https://internetdb.shodan.io/203.0.113.1"
# ---------------------------------------------------------------------------
# TestFormatResult
# ---------------------------------------------------------------------------
class TestFormatResult:
def test_full_response(self):
result = _format_result("8.8.8.8", SAMPLE_FULL)
assert "8.8.8.8 -- dns.google" in result
assert "Ports: 53, 443" in result
assert "cpe:/a:apache:http_server:2.4.41" in result
assert "Tags: cloud" in result
assert "CVE-2021-23017" in result
def test_minimal_response(self):
result = _format_result("203.0.113.1", SAMPLE_MINIMAL)
assert "203.0.113.1" in result
assert "Ports: 80" in result
# No hostnames, CPEs, tags, or vulns sections
assert "--" not in result
assert "CPEs:" not in result
assert "Tags:" not in result
assert "CVEs:" not in result
def test_empty_response(self):
result = _format_result("198.51.100.1", SAMPLE_EMPTY)
assert result == "198.51.100.1"
def test_many_vulns_truncated(self):
result = _format_result("192.0.2.1", SAMPLE_MANY_VULNS)
assert "+5 more" in result
# First 10 shown
assert "CVE-2021-0" in result
assert "CVE-2021-9" in result
def test_many_hostnames_truncated(self):
result = _format_result("192.0.2.2", SAMPLE_MANY_HOSTNAMES)
# Only first 5 hostnames shown
assert "host0.example.com" in result
assert "host4.example.com" in result
assert "host5.example.com" not in result
def test_ports_sorted(self):
data = {**SAMPLE_FULL, "ports": [8080, 22, 443, 80]}
result = _format_result("8.8.8.8", data)
assert "Ports: 22, 80, 443, 8080" in result
def test_many_cpes_truncated(self):
data = {**SAMPLE_EMPTY, "cpes": [f"cpe:/a:vendor{i}:prod" for i in range(12)]}
result = _format_result("198.51.100.1", data)
assert "cpe:/a:vendor0:prod" in result
assert "cpe:/a:vendor7:prod" in result
assert "cpe:/a:vendor8:prod" not in result
# ---------------------------------------------------------------------------
# TestCommand
# ---------------------------------------------------------------------------
class TestCommand:
def _run(self, bot, msg, data=None, side_effect=None):
"""Run command with mocked _fetch."""
if side_effect is not None:
with patch.object(_mod, "_fetch", side_effect=side_effect):
asyncio.run(cmd_internetdb(bot, msg))
else:
with patch.object(_mod, "_fetch", return_value=data):
asyncio.run(cmd_internetdb(bot, msg))
def test_valid_ip(self):
bot = _FakeBot()
self._run(bot, _msg("!internetdb 8.8.8.8"), data=SAMPLE_FULL)
assert "dns.google" in bot.replied[0]
assert "Ports:" in bot.replied[0]
def test_no_data(self):
bot = _FakeBot()
self._run(bot, _msg("!internetdb 4.4.4.4"), data=None)
assert "no data available" in bot.replied[0]
def test_no_args(self):
bot = _FakeBot()
asyncio.run(cmd_internetdb(bot, _msg("!internetdb")))
assert "Usage:" in bot.replied[0]
def test_invalid_ip(self):
bot = _FakeBot()
asyncio.run(cmd_internetdb(bot, _msg("!internetdb notanip")))
assert "Invalid IP" in bot.replied[0]
def test_private_ip(self):
bot = _FakeBot()
asyncio.run(cmd_internetdb(bot, _msg("!internetdb 192.168.1.1")))
assert "private/loopback" in bot.replied[0]
def test_loopback(self):
bot = _FakeBot()
asyncio.run(cmd_internetdb(bot, _msg("!internetdb 127.0.0.1")))
assert "private/loopback" in bot.replied[0]
def test_ipv6(self):
bot = _FakeBot()
self._run(bot, _msg("!internetdb 2606:4700::1"), data=SAMPLE_MINIMAL)
assert "Ports:" in bot.replied[0]
def test_network_error(self):
bot = _FakeBot()
self._run(
bot, _msg("!internetdb 8.8.8.8"),
side_effect=ConnectionError("timeout"),
)
assert "lookup failed" in bot.replied[0]
def test_normalized_ip(self):
"""ipaddress normalization (e.g. ::ffff:8.8.8.8 -> mapped)."""
calls = []
def _mock_fetch(addr):
calls.append(addr)
return SAMPLE_MINIMAL
bot = _FakeBot()
with patch.object(_mod, "_fetch", side_effect=_mock_fetch):
asyncio.run(cmd_internetdb(bot, _msg("!internetdb 8.008.8.8")))
# Leading zeros rejected by Python's strict parser -> "Invalid IP"
assert "Invalid IP" in bot.replied[0]
def test_empty_result(self):
bot = _FakeBot()
self._run(bot, _msg("!internetdb 198.51.100.1"), data=SAMPLE_EMPTY)
assert "198.51.100.1" in bot.replied[0]