"""Plugin: DNSBL/RBL IP reputation check (pure stdlib).""" from __future__ import annotations import asyncio import ipaddress import os import socket import struct from derp.plugin import command _DNSBLS = [ ("zen.spamhaus.org", "Spamhaus"), ("bl.spamcop.net", "SpamCop"), ("b.barracudacentral.org", "Barracuda"), ("dnsbl.sorbs.net", "SORBS"), ("spam.dnsbl.sorbs.net", "SORBS-Spam"), ("cbl.abuseat.org", "CBL"), ("dnsbl-1.uceprotect.net", "UCEPROTECT-1"), ("psbl.surriel.com", "PSBL"), ("dyna.spamrats.com", "SpamRats"), ("all.s5h.net", "S5H"), ] _TIMEOUT = 5.0 def _get_resolver() -> str: """Read first IPv4 nameserver from /etc/resolv.conf.""" try: with open("/etc/resolv.conf") as f: for line in f: line = line.strip() if line.startswith("nameserver"): addr = line.split()[1] try: ipaddress.IPv4Address(addr) return addr except ValueError: continue except (OSError, IndexError): pass return "8.8.8.8" def _build_a_query(name: str) -> bytes: """Build a minimal DNS A query.""" tid = os.urandom(2) flags = struct.pack("!H", 0x0100) counts = struct.pack("!HHHH", 1, 0, 0, 0) encoded = b"" for label in name.rstrip(".").split("."): encoded += bytes([len(label)]) + label.encode("ascii") encoded += b"\x00" return tid + flags + counts + encoded + struct.pack("!HH", 1, 1) def _check_response(data: bytes) -> bool: """Check if DNS response has answer records (listed).""" if len(data) < 12: return False _, flags, _, ancount = struct.unpack_from("!HHHH", data, 0) rcode = flags & 0x0F return rcode == 0 and ancount > 0 def _query_dnsbl(name: str, server: str) -> bool: """Blocking DNS A lookup, returns True if listed.""" query = _build_a_query(name) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.settimeout(_TIMEOUT) try: sock.sendto(query, (server, 53)) data = sock.recv(512) return _check_response(data) except (socket.timeout, OSError): return False finally: sock.close() def _reversed_ip(addr: str) -> str: """Reverse an IPv4 address for DNSBL lookup.""" return ".".join(reversed(addr.split("."))) async def _check_one(ip_rev: str, zone: str, label: str, server: str) -> tuple[str, bool]: """Check one DNSBL asynchronously.""" name = f"{ip_rev}.{zone}" loop = asyncio.get_running_loop() listed = await loop.run_in_executor(None, _query_dnsbl, name, server) return label, listed @command("blacklist", help="DNSBL check: !blacklist ") async def cmd_blacklist(bot, message): """Check an IP against common DNSBL/RBL services. Usage: !blacklist 1.2.3.4 """ parts = message.text.split(None, 2) if len(parts) < 2: await bot.reply(message, "Usage: !blacklist ") return addr = parts[1] try: ip = ipaddress.IPv4Address(addr) except ValueError: await bot.reply(message, f"Invalid IPv4 address: {addr}") return if ip.is_private or ip.is_loopback: await bot.reply(message, f"{addr}: private/loopback address, skipping DNSBL") return ip_rev = _reversed_ip(str(ip)) server = _get_resolver() tasks = [_check_one(ip_rev, zone, label, server) for zone, label in _DNSBLS] results = await asyncio.gather(*tasks) listed = [label for label, hit in results if hit] clean = [label for label, hit in results if not hit] if listed: await bot.reply(message, f"{addr} LISTED on: {', '.join(listed)} " f"({len(clean)}/{len(results)} clean)") else: await bot.reply(message, f"{addr} clean on all {len(results)} DNSBLs")