Files
derp/plugins/whois.py
user 530f33be76 feat: add wave 2 plugins and --cprofile CLI flag
Add 7 new pure-stdlib plugins: whois (raw TCP port 43), portcheck
(async TCP connect scan with internal-net guard), httpcheck (HTTP
status/redirects/timing), tlscheck (TLS version/cipher/cert inspect),
blacklist (parallel DNSBL check against 10 RBLs), rand (password/hex/
uuid/bytes/int/coin/dice), and timer (async countdown notifications).

Add --cprofile flag to CLI for profiling bot runtime. Update all docs.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 01:58:47 +01:00

119 lines
3.5 KiB
Python

"""Plugin: WHOIS lookup (raw TCP, port 43, pure stdlib)."""
from __future__ import annotations
import asyncio
import ipaddress
from derp.plugin import command
# Referral servers for common TLDs
_SERVERS = {
"com": "whois.verisign-grs.com",
"net": "whois.verisign-grs.com",
"org": "whois.pir.org",
"info": "whois.afilias.net",
"io": "whois.nic.io",
"me": "whois.nic.me",
"co": "whois.nic.co",
"de": "whois.denic.de",
"uk": "whois.nic.uk",
"nl": "whois.domain-registry.nl",
"eu": "whois.eu",
"be": "whois.dns.be",
"fr": "whois.nic.fr",
"ru": "whois.tcinet.ru",
"au": "whois.auda.org.au",
"ca": "whois.cira.ca",
"us": "whois.nic.us",
}
_IP_SERVER = "whois.arin.net"
_FALLBACK = "whois.iana.org"
_TIMEOUT = 10.0
_MAX_RESPONSE = 4096
def _pick_server(target: str) -> tuple[str, str]:
"""Return (whois_server, query_string) for a target."""
try:
ipaddress.ip_address(target)
return _IP_SERVER, target
except ValueError:
pass
# Domain: pick server by TLD
parts = target.rstrip(".").split(".")
tld = parts[-1].lower() if parts else ""
server = _SERVERS.get(tld, _FALLBACK)
return server, target
async def _whois(server: str, query: str) -> str:
"""Send a WHOIS query and return the response text."""
reader, writer = await asyncio.wait_for(
asyncio.open_connection(server, 43), timeout=_TIMEOUT,
)
try:
writer.write(f"{query}\r\n".encode("utf-8"))
await writer.drain()
data = await asyncio.wait_for(reader.read(_MAX_RESPONSE), timeout=_TIMEOUT)
return data.decode("utf-8", errors="replace")
finally:
writer.close()
await writer.wait_closed()
def _extract_fields(raw: str) -> list[str]:
"""Pull key registration fields from raw WHOIS text."""
keys = {
"domain name", "registrar", "creation date", "registry expiry date",
"updated date", "name server", "status", "netname", "netrange",
"cidr", "orgname", "organization", "country",
}
results: list[str] = []
seen: set[str] = set()
for line in raw.splitlines():
line = line.strip()
if ":" not in line:
continue
key, _, val = line.partition(":")
key_lower = key.strip().lower()
val = val.strip()
if key_lower in keys and val and key_lower not in seen:
seen.add(key_lower)
results.append(f"{key.strip()}: {val}")
return results
@command("whois", help="WHOIS lookup: !whois <domain|ip>")
async def cmd_whois(bot, message):
"""Query WHOIS for a domain or IP address."""
parts = message.text.split(None, 2)
if len(parts) < 2:
await bot.reply(message, "Usage: !whois <domain|ip>")
return
target = parts[1].lower().strip()
server, query = _pick_server(target)
try:
raw = await _whois(server, query)
except (TimeoutError, asyncio.TimeoutError):
await bot.reply(message, f"{target}: timeout ({server})")
return
except OSError as exc:
await bot.reply(message, f"{target}: connection error: {exc}")
return
fields = _extract_fields(raw)
if fields:
await bot.reply(message, " | ".join(fields[:6]))
else:
# Show first non-empty line as fallback
for line in raw.splitlines():
line = line.strip()
if line and not line.startswith("%") and not line.startswith("#"):
await bot.reply(message, line[:400])
return
await bot.reply(message, f"{target}: no data from {server}")