feat: add internetdb plugin (Shodan InternetDB host recon)
Free, keyless API returning open ports, hostnames, CPEs, tags, and known CVEs for any public IP. All requests routed through SOCKS5. 21 test cases (927 total). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
281
tests/test_internetdb.py
Normal file
281
tests/test_internetdb.py
Normal file
@@ -0,0 +1,281 @@
|
||||
"""Tests for the InternetDB plugin (Shodan free host recon)."""
|
||||
|
||||
import asyncio
|
||||
import importlib.util
|
||||
import json
|
||||
import sys
|
||||
import urllib.error
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
from derp.irc import Message
|
||||
|
||||
# plugins/ is not a Python package -- load the module from file path
|
||||
_spec = importlib.util.spec_from_file_location(
|
||||
"plugins.internetdb",
|
||||
Path(__file__).resolve().parent.parent / "plugins" / "internetdb.py",
|
||||
)
|
||||
_mod = importlib.util.module_from_spec(_spec)
|
||||
sys.modules[_spec.name] = _mod
|
||||
_spec.loader.exec_module(_mod)
|
||||
|
||||
from plugins.internetdb import ( # noqa: E402
|
||||
_fetch,
|
||||
_format_result,
|
||||
cmd_internetdb,
|
||||
)
|
||||
|
||||
# -- Sample API responses ----------------------------------------------------
|
||||
|
||||
SAMPLE_FULL = {
|
||||
"cpes": ["cpe:/a:apache:http_server:2.4.41", "cpe:/a:openssl:openssl:1.1.1"],
|
||||
"hostnames": ["dns.google"],
|
||||
"ip": "8.8.8.8",
|
||||
"ports": [443, 53],
|
||||
"tags": ["cloud"],
|
||||
"vulns": ["CVE-2021-23017", "CVE-2021-3449"],
|
||||
}
|
||||
|
||||
SAMPLE_MINIMAL = {
|
||||
"cpes": [],
|
||||
"hostnames": [],
|
||||
"ip": "203.0.113.1",
|
||||
"ports": [80],
|
||||
"tags": [],
|
||||
"vulns": [],
|
||||
}
|
||||
|
||||
SAMPLE_EMPTY = {
|
||||
"cpes": [],
|
||||
"hostnames": [],
|
||||
"ip": "198.51.100.1",
|
||||
"ports": [],
|
||||
"tags": [],
|
||||
"vulns": [],
|
||||
}
|
||||
|
||||
SAMPLE_MANY_VULNS = {
|
||||
"cpes": [],
|
||||
"hostnames": ["vuln.example.com"],
|
||||
"ip": "192.0.2.1",
|
||||
"ports": [22, 80, 443],
|
||||
"tags": ["self-signed", "eol-os"],
|
||||
"vulns": [f"CVE-2021-{i}" for i in range(15)],
|
||||
}
|
||||
|
||||
SAMPLE_MANY_HOSTNAMES = {
|
||||
"cpes": [],
|
||||
"hostnames": [f"host{i}.example.com" for i in range(8)],
|
||||
"ip": "192.0.2.2",
|
||||
"ports": [80],
|
||||
"tags": [],
|
||||
"vulns": [],
|
||||
}
|
||||
|
||||
|
||||
# -- Helpers -----------------------------------------------------------------
|
||||
|
||||
class _FakeBot:
|
||||
def __init__(self):
|
||||
self.replied: list[str] = []
|
||||
|
||||
async def reply(self, message, text: str) -> None:
|
||||
self.replied.append(text)
|
||||
|
||||
|
||||
def _msg(text: str, nick: str = "alice", target: str = "#test") -> Message:
|
||||
return Message(
|
||||
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
|
||||
command="PRIVMSG", params=[target, text], tags={},
|
||||
)
|
||||
|
||||
|
||||
class _FakeResp:
|
||||
"""Minimal file-like response for urlopen mocking."""
|
||||
|
||||
def __init__(self, data: bytes, status: int = 200):
|
||||
self._data = data
|
||||
self.status = status
|
||||
|
||||
def read(self):
|
||||
return self._data
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, *a):
|
||||
pass
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestFetch
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestFetch:
|
||||
def test_success(self):
|
||||
body = json.dumps(SAMPLE_FULL).encode()
|
||||
with patch.object(_mod, "_urlopen", return_value=_FakeResp(body)):
|
||||
result = _fetch("8.8.8.8")
|
||||
assert result == SAMPLE_FULL
|
||||
|
||||
def test_not_found(self):
|
||||
exc = urllib.error.HTTPError(
|
||||
"https://internetdb.shodan.io/192.0.2.1", 404, "Not Found", {}, None,
|
||||
)
|
||||
with patch.object(_mod, "_urlopen", side_effect=exc):
|
||||
result = _fetch("192.0.2.1")
|
||||
assert result is None
|
||||
|
||||
def test_server_error_raises(self):
|
||||
exc = urllib.error.HTTPError(
|
||||
"https://internetdb.shodan.io/192.0.2.1", 500, "Server Error", {}, None,
|
||||
)
|
||||
with patch.object(_mod, "_urlopen", side_effect=exc):
|
||||
try:
|
||||
_fetch("192.0.2.1")
|
||||
assert False, "Expected HTTPError"
|
||||
except urllib.error.HTTPError as e:
|
||||
assert e.code == 500
|
||||
|
||||
def test_builds_correct_url(self):
|
||||
calls = []
|
||||
|
||||
def _mock_urlopen(url, **kw):
|
||||
calls.append(url)
|
||||
return _FakeResp(json.dumps(SAMPLE_MINIMAL).encode())
|
||||
|
||||
with patch.object(_mod, "_urlopen", side_effect=_mock_urlopen):
|
||||
_fetch("203.0.113.1")
|
||||
assert calls[0] == "https://internetdb.shodan.io/203.0.113.1"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestFormatResult
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestFormatResult:
|
||||
def test_full_response(self):
|
||||
result = _format_result("8.8.8.8", SAMPLE_FULL)
|
||||
assert "8.8.8.8 -- dns.google" in result
|
||||
assert "Ports: 53, 443" in result
|
||||
assert "cpe:/a:apache:http_server:2.4.41" in result
|
||||
assert "Tags: cloud" in result
|
||||
assert "CVE-2021-23017" in result
|
||||
|
||||
def test_minimal_response(self):
|
||||
result = _format_result("203.0.113.1", SAMPLE_MINIMAL)
|
||||
assert "203.0.113.1" in result
|
||||
assert "Ports: 80" in result
|
||||
# No hostnames, CPEs, tags, or vulns sections
|
||||
assert "--" not in result
|
||||
assert "CPEs:" not in result
|
||||
assert "Tags:" not in result
|
||||
assert "CVEs:" not in result
|
||||
|
||||
def test_empty_response(self):
|
||||
result = _format_result("198.51.100.1", SAMPLE_EMPTY)
|
||||
assert result == "198.51.100.1"
|
||||
|
||||
def test_many_vulns_truncated(self):
|
||||
result = _format_result("192.0.2.1", SAMPLE_MANY_VULNS)
|
||||
assert "+5 more" in result
|
||||
# First 10 shown
|
||||
assert "CVE-2021-0" in result
|
||||
assert "CVE-2021-9" in result
|
||||
|
||||
def test_many_hostnames_truncated(self):
|
||||
result = _format_result("192.0.2.2", SAMPLE_MANY_HOSTNAMES)
|
||||
# Only first 5 hostnames shown
|
||||
assert "host0.example.com" in result
|
||||
assert "host4.example.com" in result
|
||||
assert "host5.example.com" not in result
|
||||
|
||||
def test_ports_sorted(self):
|
||||
data = {**SAMPLE_FULL, "ports": [8080, 22, 443, 80]}
|
||||
result = _format_result("8.8.8.8", data)
|
||||
assert "Ports: 22, 80, 443, 8080" in result
|
||||
|
||||
def test_many_cpes_truncated(self):
|
||||
data = {**SAMPLE_EMPTY, "cpes": [f"cpe:/a:vendor{i}:prod" for i in range(12)]}
|
||||
result = _format_result("198.51.100.1", data)
|
||||
assert "cpe:/a:vendor0:prod" in result
|
||||
assert "cpe:/a:vendor7:prod" in result
|
||||
assert "cpe:/a:vendor8:prod" not in result
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestCommand
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCommand:
|
||||
def _run(self, bot, msg, data=None, side_effect=None):
|
||||
"""Run command with mocked _fetch."""
|
||||
if side_effect is not None:
|
||||
with patch.object(_mod, "_fetch", side_effect=side_effect):
|
||||
asyncio.run(cmd_internetdb(bot, msg))
|
||||
else:
|
||||
with patch.object(_mod, "_fetch", return_value=data):
|
||||
asyncio.run(cmd_internetdb(bot, msg))
|
||||
|
||||
def test_valid_ip(self):
|
||||
bot = _FakeBot()
|
||||
self._run(bot, _msg("!internetdb 8.8.8.8"), data=SAMPLE_FULL)
|
||||
assert "dns.google" in bot.replied[0]
|
||||
assert "Ports:" in bot.replied[0]
|
||||
|
||||
def test_no_data(self):
|
||||
bot = _FakeBot()
|
||||
self._run(bot, _msg("!internetdb 4.4.4.4"), data=None)
|
||||
assert "no data available" in bot.replied[0]
|
||||
|
||||
def test_no_args(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_internetdb(bot, _msg("!internetdb")))
|
||||
assert "Usage:" in bot.replied[0]
|
||||
|
||||
def test_invalid_ip(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_internetdb(bot, _msg("!internetdb notanip")))
|
||||
assert "Invalid IP" in bot.replied[0]
|
||||
|
||||
def test_private_ip(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_internetdb(bot, _msg("!internetdb 192.168.1.1")))
|
||||
assert "private/loopback" in bot.replied[0]
|
||||
|
||||
def test_loopback(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_internetdb(bot, _msg("!internetdb 127.0.0.1")))
|
||||
assert "private/loopback" in bot.replied[0]
|
||||
|
||||
def test_ipv6(self):
|
||||
bot = _FakeBot()
|
||||
self._run(bot, _msg("!internetdb 2606:4700::1"), data=SAMPLE_MINIMAL)
|
||||
assert "Ports:" in bot.replied[0]
|
||||
|
||||
def test_network_error(self):
|
||||
bot = _FakeBot()
|
||||
self._run(
|
||||
bot, _msg("!internetdb 8.8.8.8"),
|
||||
side_effect=ConnectionError("timeout"),
|
||||
)
|
||||
assert "lookup failed" in bot.replied[0]
|
||||
|
||||
def test_normalized_ip(self):
|
||||
"""ipaddress normalization (e.g. ::ffff:8.8.8.8 -> mapped)."""
|
||||
calls = []
|
||||
|
||||
def _mock_fetch(addr):
|
||||
calls.append(addr)
|
||||
return SAMPLE_MINIMAL
|
||||
|
||||
bot = _FakeBot()
|
||||
with patch.object(_mod, "_fetch", side_effect=_mock_fetch):
|
||||
asyncio.run(cmd_internetdb(bot, _msg("!internetdb 8.008.8.8")))
|
||||
# Leading zeros rejected by Python's strict parser -> "Invalid IP"
|
||||
assert "Invalid IP" in bot.replied[0]
|
||||
|
||||
def test_empty_result(self):
|
||||
bot = _FakeBot()
|
||||
self._run(bot, _msg("!internetdb 198.51.100.1"), data=SAMPLE_EMPTY)
|
||||
assert "198.51.100.1" in bot.replied[0]
|
||||
Reference in New Issue
Block a user