"""Plugin: ASN lookup using MaxMind GeoLite2-ASN mmdb.""" from __future__ import annotations import ipaddress import logging from pathlib import Path from derp.plugin import command log = logging.getLogger(__name__) _DB_PATHS = [ Path("data/GeoLite2-ASN.mmdb"), Path("/usr/share/GeoIP/GeoLite2-ASN.mmdb"), Path.home() / ".local" / "share" / "GeoIP" / "GeoLite2-ASN.mmdb", ] _reader = None def _get_reader(): """Lazy-load the mmdb reader.""" global _reader if _reader is not None: return _reader try: import maxminddb except ImportError: log.error("maxminddb package not installed") return None for path in _DB_PATHS: if path.is_file(): _reader = maxminddb.open_database(str(path)) log.info("asn: loaded %s", path) return _reader log.warning("asn: no GeoLite2-ASN.mmdb found") return None @command("asn", help="ASN lookup: !asn ") async def cmd_asn(bot, message): """Look up the Autonomous System Number for an IP address. Usage: !asn 8.8.8.8 """ parts = message.text.split(None, 2) if len(parts) < 2: await bot.reply(message, "Usage: !asn ") return addr = parts[1] try: ip = ipaddress.ip_address(addr) except ValueError: await bot.reply(message, f"Invalid IP address: {addr}") return if ip.is_private or ip.is_loopback: await bot.reply(message, f"{addr}: private/loopback address") return reader = _get_reader() if reader is None: await bot.reply(message, "ASN database not available (run update-data)") return try: rec = reader.get(str(ip)) except Exception as exc: await bot.reply(message, f"Lookup error: {exc}") return if not rec: await bot.reply(message, f"{addr}: no ASN data") return asn = rec.get("autonomous_system_number", "") org = rec.get("autonomous_system_organization", "") if asn: await bot.reply(message, f"{addr}: AS{asn} ({org})" if org else f"{addr}: AS{asn}") else: await bot.reply(message, f"{addr}: no ASN data")