"""Plugin: bulk DNS resolution over TCP (SOCKS5-proxied).""" from __future__ import annotations import asyncio import ipaddress import struct from derp.dns import ( QTYPES, RCODES, build_query, parse_response, reverse_name, ) from derp.http import open_connection as _open_connection from derp.plugin import command _DEFAULT_SERVER = "1.1.1.1" _TIMEOUT = 5.0 _MAX_HOSTS = 10 async def _query_tcp(name: str, qtype: int, server: str, timeout: float = _TIMEOUT) -> tuple[int, list[str]]: """Send a DNS query over TCP and return (rcode, [values]).""" reader, writer = await asyncio.wait_for( _open_connection(server, 53, timeout=timeout), timeout=timeout, ) try: pkt = build_query(name, qtype) writer.write(struct.pack("!H", len(pkt)) + pkt) await writer.drain() length = struct.unpack("!H", await reader.readexactly(2))[0] data = await reader.readexactly(length) return parse_response(data) finally: writer.close() await writer.wait_closed() async def _resolve_one(host: str, qtype_str: str, server: str) -> str: """Resolve a single host, return formatted result line.""" qtype = QTYPES.get(qtype_str) lookup = host if qtype_str == "PTR": try: lookup = reverse_name(host) except ValueError: return f"{host} -> invalid IP for PTR" try: rcode, results = await _query_tcp(lookup, qtype, server) except (TimeoutError, asyncio.TimeoutError): return f"{host} -> timeout" except OSError as exc: return f"{host} -> error: {exc}" if rcode != 0: err = RCODES.get(rcode, f"error {rcode}") return f"{host} -> {err}" if not results: return f"{host} -> no records" return f"{host} -> {', '.join(results)}" @command("resolve", help="Bulk DNS: !resolve [host2 ...] [type]") async def cmd_resolve(bot, message): """Bulk DNS resolution via TCP through SOCKS5 proxy. Usage: !resolve example.com github.com (A records) !resolve example.com AAAA (specific type) !resolve 1.2.3.4 8.8.8.8 (auto PTR) """ parts = message.text.split() if len(parts) < 2: await bot.reply(message, "Usage: !resolve [host2 ...] [type]") return args = parts[1:] # Check if last arg is a record type qtype_str = None if args[-1].upper() in QTYPES: qtype_str = args[-1].upper() args = args[:-1] if not args: await bot.reply(message, "Usage: !resolve [host2 ...] [type]") return hosts = args[:_MAX_HOSTS] # Auto-detect type per host if not specified async def _do(host: str) -> str: qt = qtype_str if qt is None: try: ipaddress.ip_address(host) qt = "PTR" except ValueError: qt = "A" return await _resolve_one(host, qt, _DEFAULT_SERVER) results = await asyncio.gather(*[_do(h) for h in hosts]) lines = list(results) if len(args) > _MAX_HOSTS: lines.append(f"(showing first {_MAX_HOSTS} of {len(args)})") for line in lines: await bot.reply(message, line)