diff --git a/TASKS.md b/TASKS.md index 112985e..7fc3a74 100644 --- a/TASKS.md +++ b/TASKS.md @@ -1,6 +1,16 @@ # derp - Tasks -## Current Sprint -- v1.2.7 Subscription Plugin Enrichment (2026-02-19) +## Current Sprint -- v1.2.8 ASN Backend Replacement (2026-02-19) + +| Pri | Status | Task | +|-----|--------|------| +| P0 | [x] | Replace MaxMind ASN with iptoasn.com TSV backend (no license key) | +| P0 | [x] | Bisect-based lookup in `plugins/asn.py` (pure stdlib) | +| P1 | [x] | `update_asn()` in `scripts/update-data.sh` (SOCKS5 download) | +| P2 | [x] | Tests: load, lookup, command handler (30 cases, 906 total) | +| P2 | [x] | Documentation update (USAGE.md data directory layout) | + +## Previous Sprint -- v1.2.7 Subscription Plugin Enrichment (2026-02-19) | Pri | Status | Task | |-----|--------|------| diff --git a/plugins/asn.py b/plugins/asn.py index ffdd491..5d169d5 100644 --- a/plugins/asn.py +++ b/plugins/asn.py @@ -1,41 +1,112 @@ -"""Plugin: ASN lookup using MaxMind GeoLite2-ASN mmdb.""" +"""Plugin: ASN lookup using iptoasn.com TSV database.""" from __future__ import annotations import ipaddress import logging +import struct +from bisect import bisect_right from pathlib import Path from derp.plugin import command log = logging.getLogger(__name__) -_DB_PATHS = [ - Path("data/GeoLite2-ASN.mmdb"), - Path("/usr/share/GeoIP/GeoLite2-ASN.mmdb"), - Path.home() / ".local" / "share" / "GeoIP" / "GeoLite2-ASN.mmdb", -] +_DB_PATH = Path("data/ip2asn-v4.tsv") -_reader = None +# Sorted parallel arrays populated by _load_db(): +# _starts[i] = start IP as 32-bit int +# _ends[i] = end IP as 32-bit int +# _asns[i] = "AS" +# _countries[i] = two-letter country code +# _orgs[i] = AS description string +_starts: list[int] = [] +_ends: list[int] = [] +_asns: list[str] = [] +_countries: list[str] = [] +_orgs: list[str] = [] + +_loaded = False -def _get_reader(): - """Lazy-load the mmdb reader.""" - global _reader - if _reader is not None: - return _reader - try: - import maxminddb - except ImportError: - log.error("maxminddb package not installed") +def _ip_to_int(addr: str) -> int: + """Convert dotted-quad IPv4 string to 32-bit unsigned integer.""" + return struct.unpack("!I", ipaddress.IPv4Address(addr).packed)[0] + + +def _load_db(path: Path | None = None) -> bool: + """Load the iptoasn TSV into sorted arrays. + + Returns True if loaded successfully, False otherwise. + Rows with ASN 0 ("Not routed") are skipped. + """ + global _loaded + p = path or _DB_PATH + if not p.is_file(): + log.warning("asn: %s not found (run update-data)", p) + return False + + starts: list[int] = [] + ends: list[int] = [] + asns: list[str] = [] + countries: list[str] = [] + orgs: list[str] = [] + + with open(p, encoding="utf-8", errors="replace") as fh: + for line in fh: + line = line.strip() + if not line or line.startswith("#"): + continue + parts = line.split("\t") + if len(parts) < 5: + continue + asn_num = parts[2] + if asn_num == "0": + continue + try: + start = _ip_to_int(parts[0]) + end = _ip_to_int(parts[1]) + except (ValueError, struct.error): + continue + starts.append(start) + ends.append(end) + asns.append(f"AS{asn_num}") + countries.append(parts[3]) + orgs.append(parts[4]) + + _starts.clear() + _ends.clear() + _asns.clear() + _countries.clear() + _orgs.clear() + + _starts.extend(starts) + _ends.extend(ends) + _asns.extend(asns) + _countries.extend(countries) + _orgs.extend(orgs) + + _loaded = True + log.info("asn: loaded %d ranges from %s", len(_starts), p) + return True + + +def _lookup(addr: str) -> tuple[str, str, str] | None: + """Look up an IPv4 address in the loaded database. + + Returns (asn, org, country) or None if not found. + """ + if not _loaded: + if not _load_db(): + return None + + ip_int = _ip_to_int(addr) + idx = bisect_right(_starts, ip_int) - 1 + if idx < 0: return None - for path in _DB_PATHS: - if path.is_file(): - _reader = maxminddb.open_database(str(path)) - log.info("asn: loaded %s", path) - return _reader - log.warning("asn: no GeoLite2-ASN.mmdb found") - return None + if ip_int > _ends[idx]: + return None + return _asns[idx], _orgs[idx], _countries[idx] @command("asn", help="ASN lookup: !asn ") @@ -61,25 +132,17 @@ async def cmd_asn(bot, message): await bot.reply(message, f"{addr}: private/loopback address") return - reader = _get_reader() - if reader is None: - await bot.reply(message, "ASN database not available (run update-data)") + if ip.version != 4: + await bot.reply(message, f"{addr}: only IPv4 supported") return - try: - rec = reader.get(str(ip)) - except Exception as exc: - await bot.reply(message, f"Lookup error: {exc}") + result = _lookup(str(ip)) + if result is None: + if not _loaded: + await bot.reply(message, "ASN database not available (run update-data)") + else: + await bot.reply(message, f"{addr}: no ASN data") return - if not rec: - await bot.reply(message, f"{addr}: no ASN data") - return - - asn = rec.get("autonomous_system_number", "") - org = rec.get("autonomous_system_organization", "") - - if asn: - await bot.reply(message, f"{addr}: AS{asn} ({org})" if org else f"{addr}: AS{asn}") - else: - await bot.reply(message, f"{addr}: no ASN data") + asn, org, country = result + await bot.reply(message, f"{addr}: {asn} {org} ({country})") diff --git a/scripts/update-data.sh b/scripts/update-data.sh index 67550a0..40b0340 100755 --- a/scripts/update-data.sh +++ b/scripts/update-data.sh @@ -124,6 +124,26 @@ update_geolite2() { done } +# -- iptoasn ASN database ----------------------------------------------------- +update_asn() { + local dest="$DATA_DIR/ip2asn-v4.tsv" + local url="https://iptoasn.com/data/ip2asn-v4.tsv.gz" + mkdir -p "$DATA_DIR" + dim "Downloading iptoasn database..." + if curl -sS -fL --max-time 60 -o "$dest.gz" "$url" || + curl -sS -fL --socks5-hostname 127.0.0.1:1080 --max-time 60 \ + -o "$dest.gz" "$url"; then + gunzip -f "$dest.gz" + local count + count=$(wc -l < "$dest") + info "iptoasn: $count ranges" + else + rm -f "$dest.gz" + err "Failed to download iptoasn database" + ((FAILURES++)) || true + fi +} + # -- Exploit-DB CSV ----------------------------------------------------------- update_exploitdb() { local dest_dir="$DATA_DIR/exploitdb" @@ -151,6 +171,7 @@ echo update_tor update_iprep update_oui +update_asn update_exploitdb update_geolite2 diff --git a/tests/test_asn.py b/tests/test_asn.py new file mode 100644 index 0000000..23457db --- /dev/null +++ b/tests/test_asn.py @@ -0,0 +1,258 @@ +"""Tests for the ASN lookup plugin (iptoasn.com TSV backend).""" + +import asyncio +import importlib.util +import sys +import tempfile +from pathlib import Path +from unittest.mock import patch + +from derp.irc import Message + +# plugins/ is not a Python package -- load the module from file path +_spec = importlib.util.spec_from_file_location( + "plugins.asn", Path(__file__).resolve().parent.parent / "plugins" / "asn.py", +) +_mod = importlib.util.module_from_spec(_spec) +sys.modules[_spec.name] = _mod +_spec.loader.exec_module(_mod) + +from plugins.asn import ( # noqa: E402 + _ip_to_int, + _load_db, + _lookup, + cmd_asn, +) + +# -- Sample TSV data --------------------------------------------------------- + +SAMPLE_TSV = """\ +1.0.0.0\t1.0.0.255\t13335\tUS\tCLOUDFLARENET +1.0.1.0\t1.0.3.255\t0\tNone\tNot routed +1.0.4.0\t1.0.7.255\t56203\tAU\tGTELECOM +8.8.8.0\t8.8.8.255\t15169\tUS\tGOOGLE +""" + + +# -- Helpers ----------------------------------------------------------------- + +class _FakeBot: + def __init__(self): + self.replied: list[str] = [] + + async def reply(self, message, text: str) -> None: + self.replied.append(text) + + +def _msg(text: str, nick: str = "alice", target: str = "#test") -> Message: + return Message( + raw="", prefix=f"{nick}!~{nick}@host", nick=nick, + command="PRIVMSG", params=[target, text], tags={}, + ) + + +def _reset_db(): + """Clear module-level DB state between tests.""" + _mod._starts.clear() + _mod._ends.clear() + _mod._asns.clear() + _mod._countries.clear() + _mod._orgs.clear() + _mod._loaded = False + + +def _load_sample(tsv: str = SAMPLE_TSV) -> Path: + """Write TSV to a temp file, load it, return the path.""" + _reset_db() + tmp = tempfile.NamedTemporaryFile(mode="w", suffix=".tsv", delete=False) + tmp.write(tsv) + tmp.flush() + tmp.close() + p = Path(tmp.name) + _load_db(p) + return p + + +# --------------------------------------------------------------------------- +# TestIpToInt +# --------------------------------------------------------------------------- + +class TestIpToInt: + def test_zero(self): + assert _ip_to_int("0.0.0.0") == 0 + + def test_one(self): + assert _ip_to_int("0.0.0.1") == 1 + + def test_max(self): + assert _ip_to_int("255.255.255.255") == 0xFFFFFFFF + + def test_known(self): + assert _ip_to_int("1.0.0.0") == 0x01000000 + + def test_google_dns(self): + assert _ip_to_int("8.8.8.8") == 0x08080808 + + +# --------------------------------------------------------------------------- +# TestLoad +# --------------------------------------------------------------------------- + +class TestLoad: + def test_loads_rows(self): + _load_sample() + # 4 rows in TSV, but ASN 0 is skipped -> 3 entries + assert len(_mod._starts) == 3 + + def test_skips_asn_zero(self): + _load_sample() + for asn in _mod._asns: + assert asn != "AS0" + + def test_first_entry(self): + _load_sample() + assert _mod._asns[0] == "AS13335" + assert _mod._orgs[0] == "CLOUDFLARENET" + assert _mod._countries[0] == "US" + + def test_missing_file_returns_false(self): + _reset_db() + result = _load_db(Path("/nonexistent/path.tsv")) + assert result is False + assert not _mod._loaded + + def test_empty_file(self): + _reset_db() + tmp = tempfile.NamedTemporaryFile(mode="w", suffix=".tsv", delete=False) + tmp.write("") + tmp.close() + result = _load_db(Path(tmp.name)) + assert result is True + assert len(_mod._starts) == 0 + + def test_skips_comments_and_blanks(self): + tsv = "# comment\n\n1.0.0.0\t1.0.0.255\t13335\tUS\tCLOUD\n" + _load_sample(tsv) + assert len(_mod._starts) == 1 + + def test_skips_malformed_rows(self): + tsv = "bad\tdata\n1.0.0.0\t1.0.0.255\t13335\tUS\tCLOUD\n" + _load_sample(tsv) + assert len(_mod._starts) == 1 + + +# --------------------------------------------------------------------------- +# TestLookup +# --------------------------------------------------------------------------- + +class TestLookup: + def setup_method(self): + _load_sample() + + def test_exact_start(self): + result = _lookup("1.0.0.0") + assert result is not None + asn, org, country = result + assert asn == "AS13335" + assert org == "CLOUDFLARENET" + assert country == "US" + + def test_mid_range(self): + result = _lookup("1.0.0.128") + assert result is not None + assert result[0] == "AS13335" + + def test_exact_end(self): + result = _lookup("1.0.0.255") + assert result is not None + assert result[0] == "AS13335" + + def test_second_range(self): + result = _lookup("1.0.5.0") + assert result is not None + assert result[0] == "AS56203" + assert result[2] == "AU" + + def test_google_dns(self): + result = _lookup("8.8.8.8") + assert result is not None + assert result[0] == "AS15169" + assert result[1] == "GOOGLE" + + def test_miss_gap(self): + """IP in the not-routed gap (ASN 0 range) returns None.""" + result = _lookup("1.0.1.0") + assert result is None + + def test_miss_below_first(self): + result = _lookup("0.255.255.255") + assert result is None + + def test_miss_above_last(self): + result = _lookup("8.8.9.0") + assert result is None + + def test_db_not_loaded(self): + _reset_db() + with patch.object(_mod, "_DB_PATH", Path("/nonexistent")): + result = _lookup("1.0.0.0") + assert result is None + + +# --------------------------------------------------------------------------- +# TestCommand +# --------------------------------------------------------------------------- + +class TestCommand: + def setup_method(self): + _load_sample() + + def test_valid_ip(self): + bot = _FakeBot() + asyncio.run(cmd_asn(bot, _msg("!asn 1.0.0.1"))) + assert "AS13335" in bot.replied[0] + assert "CLOUDFLARENET" in bot.replied[0] + assert "(US)" in bot.replied[0] + + def test_google_dns(self): + bot = _FakeBot() + asyncio.run(cmd_asn(bot, _msg("!asn 8.8.8.8"))) + assert "AS15169" in bot.replied[0] + assert "GOOGLE" in bot.replied[0] + + def test_private_ip(self): + bot = _FakeBot() + asyncio.run(cmd_asn(bot, _msg("!asn 192.168.1.1"))) + assert "private/loopback" in bot.replied[0] + + def test_loopback(self): + bot = _FakeBot() + asyncio.run(cmd_asn(bot, _msg("!asn 127.0.0.1"))) + assert "private/loopback" in bot.replied[0] + + def test_invalid_input(self): + bot = _FakeBot() + asyncio.run(cmd_asn(bot, _msg("!asn notanip"))) + assert "Invalid IP" in bot.replied[0] + + def test_no_args(self): + bot = _FakeBot() + asyncio.run(cmd_asn(bot, _msg("!asn"))) + assert "Usage:" in bot.replied[0] + + def test_ipv6_rejected(self): + bot = _FakeBot() + asyncio.run(cmd_asn(bot, _msg("!asn 2606:4700::1"))) + assert "only IPv4" in bot.replied[0] + + def test_no_match(self): + bot = _FakeBot() + asyncio.run(cmd_asn(bot, _msg("!asn 200.200.200.200"))) + assert "no ASN data" in bot.replied[0] + + def test_db_missing(self): + _reset_db() + bot = _FakeBot() + with patch.object(_mod, "_DB_PATH", Path("/nonexistent")): + asyncio.run(cmd_asn(bot, _msg("!asn 1.0.0.1"))) + assert "not available" in bot.replied[0]