"""Plugin: DNSBL/RBL IP reputation check via Tor DNS resolver.""" from __future__ import annotations import asyncio import ipaddress import socket from derp.dns import TOR_DNS_ADDR, TOR_DNS_PORT, build_query, parse_response from derp.plugin import command _DNSBLS = [ ("zen.spamhaus.org", "Spamhaus"), ("bl.spamcop.net", "SpamCop"), ("b.barracudacentral.org", "Barracuda"), ("dnsbl.sorbs.net", "SORBS"), ("spam.dnsbl.sorbs.net", "SORBS-Spam"), ("cbl.abuseat.org", "CBL"), ("dnsbl-1.uceprotect.net", "UCEPROTECT-1"), ("psbl.surriel.com", "PSBL"), ("dyna.spamrats.com", "SpamRats"), ("all.s5h.net", "S5H"), ] _TIMEOUT = 5.0 def _query_dnsbl(name: str) -> bool: """Blocking DNS A lookup via Tor resolver, returns True if listed.""" query = build_query(name, 1) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.settimeout(_TIMEOUT) try: sock.sendto(query, (TOR_DNS_ADDR, TOR_DNS_PORT)) data = sock.recv(512) rcode, results = parse_response(data) return rcode == 0 and len(results) > 0 except (socket.timeout, OSError): return False finally: sock.close() def _reversed_ip(addr: str) -> str: """Reverse an IPv4 address for DNSBL lookup.""" return ".".join(reversed(addr.split("."))) async def _check_one(ip_rev: str, zone: str, label: str) -> tuple[str, bool]: """Check one DNSBL asynchronously.""" name = f"{ip_rev}.{zone}" loop = asyncio.get_running_loop() listed = await loop.run_in_executor(None, _query_dnsbl, name) return label, listed @command("blacklist", help="DNSBL check: !blacklist ") async def cmd_blacklist(bot, message): """Check an IP against common DNSBL/RBL services. Usage: !blacklist 1.2.3.4 """ parts = message.text.split(None, 2) if len(parts) < 2: await bot.reply(message, "Usage: !blacklist ") return addr = parts[1] try: ip = ipaddress.IPv4Address(addr) except ValueError: await bot.reply(message, f"Invalid IPv4 address: {addr}") return if ip.is_private or ip.is_loopback: await bot.reply(message, f"{addr}: private/loopback address, skipping DNSBL") return ip_rev = _reversed_ip(str(ip)) tasks = [_check_one(ip_rev, zone, label) for zone, label in _DNSBLS] results = await asyncio.gather(*tasks) listed = [label for label, hit in results if hit] clean = [label for label, hit in results if not hit] if listed: await bot.reply(message, f"{addr} LISTED on: {', '.join(listed)} " f"({len(clean)}/{len(results)} clean)") else: await bot.reply(message, f"{addr} clean on all {len(results)} DNSBLs")