feat: replace MaxMind ASN with iptoasn.com TSV backend

Drop GeoLite2-ASN.mmdb dependency (required license key) in favor of
iptoasn.com ip2asn-v4.tsv (no auth, public domain).  Bisect-based
lookup in pure stdlib, downloaded via SOCKS5 in update-data.sh.
Adds 30 test cases covering load, lookup, and command handler.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
user
2026-02-19 20:43:00 +01:00
parent 6f1f4b2fc8
commit 442fea703c
4 changed files with 394 additions and 42 deletions

258
tests/test_asn.py Normal file
View File

@@ -0,0 +1,258 @@
"""Tests for the ASN lookup plugin (iptoasn.com TSV backend)."""
import asyncio
import importlib.util
import sys
import tempfile
from pathlib import Path
from unittest.mock import patch
from derp.irc import Message
# plugins/ is not a Python package -- load the module from file path
_spec = importlib.util.spec_from_file_location(
"plugins.asn", Path(__file__).resolve().parent.parent / "plugins" / "asn.py",
)
_mod = importlib.util.module_from_spec(_spec)
sys.modules[_spec.name] = _mod
_spec.loader.exec_module(_mod)
from plugins.asn import ( # noqa: E402
_ip_to_int,
_load_db,
_lookup,
cmd_asn,
)
# -- Sample TSV data ---------------------------------------------------------
SAMPLE_TSV = """\
1.0.0.0\t1.0.0.255\t13335\tUS\tCLOUDFLARENET
1.0.1.0\t1.0.3.255\t0\tNone\tNot routed
1.0.4.0\t1.0.7.255\t56203\tAU\tGTELECOM
8.8.8.0\t8.8.8.255\t15169\tUS\tGOOGLE
"""
# -- Helpers -----------------------------------------------------------------
class _FakeBot:
def __init__(self):
self.replied: list[str] = []
async def reply(self, message, text: str) -> None:
self.replied.append(text)
def _msg(text: str, nick: str = "alice", target: str = "#test") -> Message:
return Message(
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
command="PRIVMSG", params=[target, text], tags={},
)
def _reset_db():
"""Clear module-level DB state between tests."""
_mod._starts.clear()
_mod._ends.clear()
_mod._asns.clear()
_mod._countries.clear()
_mod._orgs.clear()
_mod._loaded = False
def _load_sample(tsv: str = SAMPLE_TSV) -> Path:
"""Write TSV to a temp file, load it, return the path."""
_reset_db()
tmp = tempfile.NamedTemporaryFile(mode="w", suffix=".tsv", delete=False)
tmp.write(tsv)
tmp.flush()
tmp.close()
p = Path(tmp.name)
_load_db(p)
return p
# ---------------------------------------------------------------------------
# TestIpToInt
# ---------------------------------------------------------------------------
class TestIpToInt:
def test_zero(self):
assert _ip_to_int("0.0.0.0") == 0
def test_one(self):
assert _ip_to_int("0.0.0.1") == 1
def test_max(self):
assert _ip_to_int("255.255.255.255") == 0xFFFFFFFF
def test_known(self):
assert _ip_to_int("1.0.0.0") == 0x01000000
def test_google_dns(self):
assert _ip_to_int("8.8.8.8") == 0x08080808
# ---------------------------------------------------------------------------
# TestLoad
# ---------------------------------------------------------------------------
class TestLoad:
def test_loads_rows(self):
_load_sample()
# 4 rows in TSV, but ASN 0 is skipped -> 3 entries
assert len(_mod._starts) == 3
def test_skips_asn_zero(self):
_load_sample()
for asn in _mod._asns:
assert asn != "AS0"
def test_first_entry(self):
_load_sample()
assert _mod._asns[0] == "AS13335"
assert _mod._orgs[0] == "CLOUDFLARENET"
assert _mod._countries[0] == "US"
def test_missing_file_returns_false(self):
_reset_db()
result = _load_db(Path("/nonexistent/path.tsv"))
assert result is False
assert not _mod._loaded
def test_empty_file(self):
_reset_db()
tmp = tempfile.NamedTemporaryFile(mode="w", suffix=".tsv", delete=False)
tmp.write("")
tmp.close()
result = _load_db(Path(tmp.name))
assert result is True
assert len(_mod._starts) == 0
def test_skips_comments_and_blanks(self):
tsv = "# comment\n\n1.0.0.0\t1.0.0.255\t13335\tUS\tCLOUD\n"
_load_sample(tsv)
assert len(_mod._starts) == 1
def test_skips_malformed_rows(self):
tsv = "bad\tdata\n1.0.0.0\t1.0.0.255\t13335\tUS\tCLOUD\n"
_load_sample(tsv)
assert len(_mod._starts) == 1
# ---------------------------------------------------------------------------
# TestLookup
# ---------------------------------------------------------------------------
class TestLookup:
def setup_method(self):
_load_sample()
def test_exact_start(self):
result = _lookup("1.0.0.0")
assert result is not None
asn, org, country = result
assert asn == "AS13335"
assert org == "CLOUDFLARENET"
assert country == "US"
def test_mid_range(self):
result = _lookup("1.0.0.128")
assert result is not None
assert result[0] == "AS13335"
def test_exact_end(self):
result = _lookup("1.0.0.255")
assert result is not None
assert result[0] == "AS13335"
def test_second_range(self):
result = _lookup("1.0.5.0")
assert result is not None
assert result[0] == "AS56203"
assert result[2] == "AU"
def test_google_dns(self):
result = _lookup("8.8.8.8")
assert result is not None
assert result[0] == "AS15169"
assert result[1] == "GOOGLE"
def test_miss_gap(self):
"""IP in the not-routed gap (ASN 0 range) returns None."""
result = _lookup("1.0.1.0")
assert result is None
def test_miss_below_first(self):
result = _lookup("0.255.255.255")
assert result is None
def test_miss_above_last(self):
result = _lookup("8.8.9.0")
assert result is None
def test_db_not_loaded(self):
_reset_db()
with patch.object(_mod, "_DB_PATH", Path("/nonexistent")):
result = _lookup("1.0.0.0")
assert result is None
# ---------------------------------------------------------------------------
# TestCommand
# ---------------------------------------------------------------------------
class TestCommand:
def setup_method(self):
_load_sample()
def test_valid_ip(self):
bot = _FakeBot()
asyncio.run(cmd_asn(bot, _msg("!asn 1.0.0.1")))
assert "AS13335" in bot.replied[0]
assert "CLOUDFLARENET" in bot.replied[0]
assert "(US)" in bot.replied[0]
def test_google_dns(self):
bot = _FakeBot()
asyncio.run(cmd_asn(bot, _msg("!asn 8.8.8.8")))
assert "AS15169" in bot.replied[0]
assert "GOOGLE" in bot.replied[0]
def test_private_ip(self):
bot = _FakeBot()
asyncio.run(cmd_asn(bot, _msg("!asn 192.168.1.1")))
assert "private/loopback" in bot.replied[0]
def test_loopback(self):
bot = _FakeBot()
asyncio.run(cmd_asn(bot, _msg("!asn 127.0.0.1")))
assert "private/loopback" in bot.replied[0]
def test_invalid_input(self):
bot = _FakeBot()
asyncio.run(cmd_asn(bot, _msg("!asn notanip")))
assert "Invalid IP" in bot.replied[0]
def test_no_args(self):
bot = _FakeBot()
asyncio.run(cmd_asn(bot, _msg("!asn")))
assert "Usage:" in bot.replied[0]
def test_ipv6_rejected(self):
bot = _FakeBot()
asyncio.run(cmd_asn(bot, _msg("!asn 2606:4700::1")))
assert "only IPv4" in bot.replied[0]
def test_no_match(self):
bot = _FakeBot()
asyncio.run(cmd_asn(bot, _msg("!asn 200.200.200.200")))
assert "no ASN data" in bot.replied[0]
def test_db_missing(self):
_reset_db()
bot = _FakeBot()
with patch.object(_mod, "_DB_PATH", Path("/nonexistent")):
asyncio.run(cmd_asn(bot, _msg("!asn 1.0.0.1")))
assert "not available" in bot.replied[0]