"""Plugin: WHOIS lookup (raw TCP, port 43, SOCKS5-proxied).""" from __future__ import annotations import asyncio import ipaddress from derp.http import open_connection as _open_connection from derp.plugin import command # Referral servers for common TLDs _SERVERS = { "com": "whois.verisign-grs.com", "net": "whois.verisign-grs.com", "org": "whois.pir.org", "info": "whois.afilias.net", "io": "whois.nic.io", "me": "whois.nic.me", "co": "whois.nic.co", "de": "whois.denic.de", "uk": "whois.nic.uk", "nl": "whois.domain-registry.nl", "eu": "whois.eu", "be": "whois.dns.be", "fr": "whois.nic.fr", "ru": "whois.tcinet.ru", "au": "whois.auda.org.au", "ca": "whois.cira.ca", "us": "whois.nic.us", } _IP_SERVER = "whois.arin.net" _FALLBACK = "whois.iana.org" _TIMEOUT = 10.0 _MAX_RESPONSE = 4096 def _pick_server(target: str) -> tuple[str, str]: """Return (whois_server, query_string) for a target.""" try: ipaddress.ip_address(target) return _IP_SERVER, target except ValueError: pass # Domain: pick server by TLD parts = target.rstrip(".").split(".") tld = parts[-1].lower() if parts else "" server = _SERVERS.get(tld, _FALLBACK) return server, target async def _whois(server: str, query: str) -> str: """Send a WHOIS query and return the response text.""" reader, writer = await asyncio.wait_for( _open_connection(server, 43, timeout=_TIMEOUT), timeout=_TIMEOUT, ) try: writer.write(f"{query}\r\n".encode("utf-8")) await writer.drain() data = await asyncio.wait_for(reader.read(_MAX_RESPONSE), timeout=_TIMEOUT) return data.decode("utf-8", errors="replace") finally: writer.close() await writer.wait_closed() def _extract_fields(raw: str) -> list[str]: """Pull key registration fields from raw WHOIS text.""" keys = { "domain name", "registrar", "creation date", "registry expiry date", "updated date", "name server", "status", "netname", "netrange", "cidr", "orgname", "organization", "country", } results: list[str] = [] seen: set[str] = set() for line in raw.splitlines(): line = line.strip() if ":" not in line: continue key, _, val = line.partition(":") key_lower = key.strip().lower() val = val.strip() if key_lower in keys and val and key_lower not in seen: seen.add(key_lower) results.append(f"{key.strip()}: {val}") return results @command("whois", help="WHOIS lookup: !whois ") async def cmd_whois(bot, message): """Query WHOIS for a domain or IP address.""" parts = message.text.split(None, 2) if len(parts) < 2: await bot.reply(message, "Usage: !whois ") return target = parts[1].lower().strip() server, query = _pick_server(target) try: raw = await _whois(server, query) except (TimeoutError, asyncio.TimeoutError): await bot.reply(message, f"{target}: timeout ({server})") return except OSError as exc: await bot.reply(message, f"{target}: connection error: {exc}") return fields = _extract_fields(raw) if fields: await bot.reply(message, " | ".join(fields[:6])) else: # Show first non-empty line as fallback for line in raw.splitlines(): line = line.strip() if line and not line.startswith("%") and not line.startswith("#"): await bot.reply(message, line[:400]) return await bot.reply(message, f"{target}: no data from {server}")