Drop GeoLite2-ASN.mmdb dependency (required license key) in favor of iptoasn.com ip2asn-v4.tsv (no auth, public domain). Bisect-based lookup in pure stdlib, downloaded via SOCKS5 in update-data.sh. Adds 30 test cases covering load, lookup, and command handler. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
149 lines
3.8 KiB
Python
149 lines
3.8 KiB
Python
"""Plugin: ASN lookup using iptoasn.com TSV database."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import ipaddress
|
|
import logging
|
|
import struct
|
|
from bisect import bisect_right
|
|
from pathlib import Path
|
|
|
|
from derp.plugin import command
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
_DB_PATH = Path("data/ip2asn-v4.tsv")
|
|
|
|
# Sorted parallel arrays populated by _load_db():
|
|
# _starts[i] = start IP as 32-bit int
|
|
# _ends[i] = end IP as 32-bit int
|
|
# _asns[i] = "AS<number>"
|
|
# _countries[i] = two-letter country code
|
|
# _orgs[i] = AS description string
|
|
_starts: list[int] = []
|
|
_ends: list[int] = []
|
|
_asns: list[str] = []
|
|
_countries: list[str] = []
|
|
_orgs: list[str] = []
|
|
|
|
_loaded = False
|
|
|
|
|
|
def _ip_to_int(addr: str) -> int:
|
|
"""Convert dotted-quad IPv4 string to 32-bit unsigned integer."""
|
|
return struct.unpack("!I", ipaddress.IPv4Address(addr).packed)[0]
|
|
|
|
|
|
def _load_db(path: Path | None = None) -> bool:
|
|
"""Load the iptoasn TSV into sorted arrays.
|
|
|
|
Returns True if loaded successfully, False otherwise.
|
|
Rows with ASN 0 ("Not routed") are skipped.
|
|
"""
|
|
global _loaded
|
|
p = path or _DB_PATH
|
|
if not p.is_file():
|
|
log.warning("asn: %s not found (run update-data)", p)
|
|
return False
|
|
|
|
starts: list[int] = []
|
|
ends: list[int] = []
|
|
asns: list[str] = []
|
|
countries: list[str] = []
|
|
orgs: list[str] = []
|
|
|
|
with open(p, encoding="utf-8", errors="replace") as fh:
|
|
for line in fh:
|
|
line = line.strip()
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
parts = line.split("\t")
|
|
if len(parts) < 5:
|
|
continue
|
|
asn_num = parts[2]
|
|
if asn_num == "0":
|
|
continue
|
|
try:
|
|
start = _ip_to_int(parts[0])
|
|
end = _ip_to_int(parts[1])
|
|
except (ValueError, struct.error):
|
|
continue
|
|
starts.append(start)
|
|
ends.append(end)
|
|
asns.append(f"AS{asn_num}")
|
|
countries.append(parts[3])
|
|
orgs.append(parts[4])
|
|
|
|
_starts.clear()
|
|
_ends.clear()
|
|
_asns.clear()
|
|
_countries.clear()
|
|
_orgs.clear()
|
|
|
|
_starts.extend(starts)
|
|
_ends.extend(ends)
|
|
_asns.extend(asns)
|
|
_countries.extend(countries)
|
|
_orgs.extend(orgs)
|
|
|
|
_loaded = True
|
|
log.info("asn: loaded %d ranges from %s", len(_starts), p)
|
|
return True
|
|
|
|
|
|
def _lookup(addr: str) -> tuple[str, str, str] | None:
|
|
"""Look up an IPv4 address in the loaded database.
|
|
|
|
Returns (asn, org, country) or None if not found.
|
|
"""
|
|
if not _loaded:
|
|
if not _load_db():
|
|
return None
|
|
|
|
ip_int = _ip_to_int(addr)
|
|
idx = bisect_right(_starts, ip_int) - 1
|
|
if idx < 0:
|
|
return None
|
|
if ip_int > _ends[idx]:
|
|
return None
|
|
return _asns[idx], _orgs[idx], _countries[idx]
|
|
|
|
|
|
@command("asn", help="ASN lookup: !asn <ip>")
|
|
async def cmd_asn(bot, message):
|
|
"""Look up the Autonomous System Number for an IP address.
|
|
|
|
Usage:
|
|
!asn 8.8.8.8
|
|
"""
|
|
parts = message.text.split(None, 2)
|
|
if len(parts) < 2:
|
|
await bot.reply(message, "Usage: !asn <ip>")
|
|
return
|
|
|
|
addr = parts[1]
|
|
try:
|
|
ip = ipaddress.ip_address(addr)
|
|
except ValueError:
|
|
await bot.reply(message, f"Invalid IP address: {addr}")
|
|
return
|
|
|
|
if ip.is_private or ip.is_loopback:
|
|
await bot.reply(message, f"{addr}: private/loopback address")
|
|
return
|
|
|
|
if ip.version != 4:
|
|
await bot.reply(message, f"{addr}: only IPv4 supported")
|
|
return
|
|
|
|
result = _lookup(str(ip))
|
|
if result is None:
|
|
if not _loaded:
|
|
await bot.reply(message, "ASN database not available (run update-data)")
|
|
else:
|
|
await bot.reply(message, f"{addr}: no ASN data")
|
|
return
|
|
|
|
asn, org, country = result
|
|
await bot.reply(message, f"{addr}: {asn} {org} ({country})")
|