Add create_connection and open_connection helpers to the shared proxy module, covering portcheck, whois, tlscheck, and crtsh live-cert check. UDP-based plugins (dns, blacklist, subdomain) stay direct. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
120 lines
3.6 KiB
Python
120 lines
3.6 KiB
Python
"""Plugin: WHOIS lookup (raw TCP, port 43, SOCKS5-proxied)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import ipaddress
|
|
|
|
from derp.http import open_connection as _open_connection
|
|
from derp.plugin import command
|
|
|
|
# Referral servers for common TLDs
|
|
_SERVERS = {
|
|
"com": "whois.verisign-grs.com",
|
|
"net": "whois.verisign-grs.com",
|
|
"org": "whois.pir.org",
|
|
"info": "whois.afilias.net",
|
|
"io": "whois.nic.io",
|
|
"me": "whois.nic.me",
|
|
"co": "whois.nic.co",
|
|
"de": "whois.denic.de",
|
|
"uk": "whois.nic.uk",
|
|
"nl": "whois.domain-registry.nl",
|
|
"eu": "whois.eu",
|
|
"be": "whois.dns.be",
|
|
"fr": "whois.nic.fr",
|
|
"ru": "whois.tcinet.ru",
|
|
"au": "whois.auda.org.au",
|
|
"ca": "whois.cira.ca",
|
|
"us": "whois.nic.us",
|
|
}
|
|
_IP_SERVER = "whois.arin.net"
|
|
_FALLBACK = "whois.iana.org"
|
|
_TIMEOUT = 10.0
|
|
_MAX_RESPONSE = 4096
|
|
|
|
|
|
def _pick_server(target: str) -> tuple[str, str]:
|
|
"""Return (whois_server, query_string) for a target."""
|
|
try:
|
|
ipaddress.ip_address(target)
|
|
return _IP_SERVER, target
|
|
except ValueError:
|
|
pass
|
|
|
|
# Domain: pick server by TLD
|
|
parts = target.rstrip(".").split(".")
|
|
tld = parts[-1].lower() if parts else ""
|
|
server = _SERVERS.get(tld, _FALLBACK)
|
|
return server, target
|
|
|
|
|
|
async def _whois(server: str, query: str) -> str:
|
|
"""Send a WHOIS query and return the response text."""
|
|
reader, writer = await asyncio.wait_for(
|
|
_open_connection(server, 43, timeout=_TIMEOUT), timeout=_TIMEOUT,
|
|
)
|
|
try:
|
|
writer.write(f"{query}\r\n".encode("utf-8"))
|
|
await writer.drain()
|
|
data = await asyncio.wait_for(reader.read(_MAX_RESPONSE), timeout=_TIMEOUT)
|
|
return data.decode("utf-8", errors="replace")
|
|
finally:
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
|
|
|
|
def _extract_fields(raw: str) -> list[str]:
|
|
"""Pull key registration fields from raw WHOIS text."""
|
|
keys = {
|
|
"domain name", "registrar", "creation date", "registry expiry date",
|
|
"updated date", "name server", "status", "netname", "netrange",
|
|
"cidr", "orgname", "organization", "country",
|
|
}
|
|
results: list[str] = []
|
|
seen: set[str] = set()
|
|
for line in raw.splitlines():
|
|
line = line.strip()
|
|
if ":" not in line:
|
|
continue
|
|
key, _, val = line.partition(":")
|
|
key_lower = key.strip().lower()
|
|
val = val.strip()
|
|
if key_lower in keys and val and key_lower not in seen:
|
|
seen.add(key_lower)
|
|
results.append(f"{key.strip()}: {val}")
|
|
return results
|
|
|
|
|
|
@command("whois", help="WHOIS lookup: !whois <domain|ip>")
|
|
async def cmd_whois(bot, message):
|
|
"""Query WHOIS for a domain or IP address."""
|
|
parts = message.text.split(None, 2)
|
|
if len(parts) < 2:
|
|
await bot.reply(message, "Usage: !whois <domain|ip>")
|
|
return
|
|
|
|
target = parts[1].lower().strip()
|
|
server, query = _pick_server(target)
|
|
|
|
try:
|
|
raw = await _whois(server, query)
|
|
except (TimeoutError, asyncio.TimeoutError):
|
|
await bot.reply(message, f"{target}: timeout ({server})")
|
|
return
|
|
except OSError as exc:
|
|
await bot.reply(message, f"{target}: connection error: {exc}")
|
|
return
|
|
|
|
fields = _extract_fields(raw)
|
|
if fields:
|
|
await bot.reply(message, " | ".join(fields[:6]))
|
|
else:
|
|
# Show first non-empty line as fallback
|
|
for line in raw.splitlines():
|
|
line = line.strip()
|
|
if line and not line.startswith("%") and not line.startswith("#"):
|
|
await bot.reply(message, line[:400])
|
|
return
|
|
await bot.reply(message, f"{target}: no data from {server}")
|