"""Plugin: IP reputation check against Firehol blocklist feeds.""" from __future__ import annotations import ipaddress import logging import time from pathlib import Path from derp.plugin import command log = logging.getLogger(__name__) _DATA_DIR = Path("data/iprep") # Firehol feeds: (filename, url, description) _FEEDS = [ ("firehol_level1.netset", "https://raw.githubusercontent.com/firehol/blocklist-ipsets/master/firehol_level1.netset", "Firehol L1"), ("firehol_level2.netset", "https://raw.githubusercontent.com/firehol/blocklist-ipsets/master/firehol_level2.netset", "Firehol L2"), ("et_compromised.ipset", "https://raw.githubusercontent.com/firehol/blocklist-ipsets/master/et_compromised.ipset", "ET Compromised"), ("dshield.netset", "https://raw.githubusercontent.com/firehol/blocklist-ipsets/master/dshield.netset", "DShield"), ("spamhaus_drop.netset", "https://raw.githubusercontent.com/firehol/blocklist-ipsets/master/spamhaus_drop.netset", "Spamhaus DROP"), ] _MAX_AGE = 86400 # Refresh cache after 24h # Cache: feed_name -> (set of IPs/networks, load_time) _cache: dict[str, tuple[set[str], list, float]] = {} def _parse_feed(path: Path) -> tuple[set[str], list]: """Parse a feed file into sets of IPs and CIDR networks.""" ips: set[str] = set() nets: list = [] try: for line in path.read_text().splitlines(): line = line.strip() if not line or line.startswith("#"): continue if "/" in line: try: nets.append(ipaddress.ip_network(line, strict=False)) except ValueError: continue else: try: ipaddress.ip_address(line) ips.add(line) except ValueError: continue except OSError: pass return ips, nets def _load_feed(name: str) -> tuple[set[str], list]: """Load a feed from local cache, refreshing if stale.""" now = time.monotonic() if name in _cache: ips, nets, loaded = _cache[name] if (now - loaded) < _MAX_AGE: return ips, nets path = _DATA_DIR / name if not path.is_file(): return set(), [] ips, nets = _parse_feed(path) _cache[name] = (ips, nets, now) return ips, nets def _check_ip(addr: str) -> list[str]: """Check an IP against all loaded feeds. Returns list of matching feed names.""" try: ip_obj = ipaddress.ip_address(addr) except ValueError: return [] hits = [] for filename, _url, label in _FEEDS: ips, nets = _load_feed(filename) if addr in ips: hits.append(label) continue for net in nets: if ip_obj in net: hits.append(label) break return hits async def _download_feeds() -> tuple[int, int]: """Download all feeds. Returns (success_count, fail_count).""" import asyncio import urllib.request _DATA_DIR.mkdir(parents=True, exist_ok=True) loop = asyncio.get_running_loop() async def _fetch_one(filename: str, url: str) -> bool: def _do(): req = urllib.request.Request(url, headers={"User-Agent": "derp-bot"}) with urllib.request.urlopen(req, timeout=30) as resp: # noqa: S310 return resp.read() try: data = await loop.run_in_executor(None, _do) (_DATA_DIR / filename).write_bytes(data) return True except Exception as exc: log.error("iprep: failed to fetch %s: %s", filename, exc) return False tasks = [_fetch_one(fn, url) for fn, url, _ in _FEEDS] results = await asyncio.gather(*tasks) # Clear cache to force reload _cache.clear() ok = sum(1 for r in results if r) return ok, len(results) - ok @command("iprep", help="IP reputation: !iprep ") async def cmd_iprep(bot, message): """Check IP against Firehol/ET blocklist feeds. Usage: !iprep 1.2.3.4 Check IP reputation !iprep update Download latest feeds """ parts = message.text.split(None, 2) if len(parts) < 2: await bot.reply(message, "Usage: !iprep ") return arg = parts[1].strip() if arg == "update": await bot.reply(message, f"Downloading {len(_FEEDS)} feeds...") ok, fail = await _download_feeds() msg = f"Updated: {ok}/{len(_FEEDS)} feeds" if fail: msg += f" ({fail} failed)" await bot.reply(message, msg) return try: ip = ipaddress.ip_address(arg) except ValueError: await bot.reply(message, f"Invalid IP address: {arg}") return if ip.is_private or ip.is_loopback: await bot.reply(message, f"{arg}: private/loopback address") return # Check if any feeds are loaded has_data = any((_DATA_DIR / fn).is_file() for fn, _, _ in _FEEDS) if not has_data: await bot.reply(message, "No feeds loaded (run !iprep update)") return hits = _check_ip(str(ip)) if hits: await bot.reply(message, f"{arg}: LISTED on {', '.join(hits)} " f"({len(hits)}/{len(_FEEDS)} feeds)") else: await bot.reply(message, f"{arg}: clean ({len(_FEEDS)} feeds checked)")