"""Plugin: ASN lookup using iptoasn.com TSV database.""" from __future__ import annotations import ipaddress import logging import struct from bisect import bisect_right from pathlib import Path from derp.plugin import command log = logging.getLogger(__name__) _DB_PATH = Path("data/ip2asn-v4.tsv") # Sorted parallel arrays populated by _load_db(): # _starts[i] = start IP as 32-bit int # _ends[i] = end IP as 32-bit int # _asns[i] = "AS" # _countries[i] = two-letter country code # _orgs[i] = AS description string _starts: list[int] = [] _ends: list[int] = [] _asns: list[str] = [] _countries: list[str] = [] _orgs: list[str] = [] _loaded = False def _ip_to_int(addr: str) -> int: """Convert dotted-quad IPv4 string to 32-bit unsigned integer.""" return struct.unpack("!I", ipaddress.IPv4Address(addr).packed)[0] def _load_db(path: Path | None = None) -> bool: """Load the iptoasn TSV into sorted arrays. Returns True if loaded successfully, False otherwise. Rows with ASN 0 ("Not routed") are skipped. """ global _loaded p = path or _DB_PATH if not p.is_file(): log.warning("asn: %s not found (run update-data)", p) return False starts: list[int] = [] ends: list[int] = [] asns: list[str] = [] countries: list[str] = [] orgs: list[str] = [] with open(p, encoding="utf-8", errors="replace") as fh: for line in fh: line = line.strip() if not line or line.startswith("#"): continue parts = line.split("\t") if len(parts) < 5: continue asn_num = parts[2] if asn_num == "0": continue try: start = _ip_to_int(parts[0]) end = _ip_to_int(parts[1]) except (ValueError, struct.error): continue starts.append(start) ends.append(end) asns.append(f"AS{asn_num}") countries.append(parts[3]) orgs.append(parts[4]) _starts.clear() _ends.clear() _asns.clear() _countries.clear() _orgs.clear() _starts.extend(starts) _ends.extend(ends) _asns.extend(asns) _countries.extend(countries) _orgs.extend(orgs) _loaded = True log.info("asn: loaded %d ranges from %s", len(_starts), p) return True def _lookup(addr: str) -> tuple[str, str, str] | None: """Look up an IPv4 address in the loaded database. Returns (asn, org, country) or None if not found. """ if not _loaded: if not _load_db(): return None ip_int = _ip_to_int(addr) idx = bisect_right(_starts, ip_int) - 1 if idx < 0: return None if ip_int > _ends[idx]: return None return _asns[idx], _orgs[idx], _countries[idx] @command("asn", help="ASN lookup: !asn ") async def cmd_asn(bot, message): """Look up the Autonomous System Number for an IP address. Usage: !asn 8.8.8.8 """ parts = message.text.split(None, 2) if len(parts) < 2: await bot.reply(message, "Usage: !asn ") return addr = parts[1] try: ip = ipaddress.ip_address(addr) except ValueError: await bot.reply(message, f"Invalid IP address: {addr}") return if ip.is_private or ip.is_loopback: await bot.reply(message, f"{addr}: private/loopback address") return if ip.version != 4: await bot.reply(message, f"{addr}: only IPv4 supported") return result = _lookup(str(ip)) if result is None: if not _loaded: await bot.reply(message, "ASN database not available (run update-data)") else: await bot.reply(message, f"{addr}: no ASN data") return asn, org, country = result await bot.reply(message, f"{addr}: {asn} {org} ({country})")