Extract shared DNS wire-format helpers into src/derp/dns.py so both the UDP plugin (dns.py) and the new TCP plugin (tdns.py) share the same encode/decode/build/parse logic. The !tdns command routes queries through the SOCKS5 proxy via derp.http.open_connection, using TCP framing (2-byte length prefix). Default server: 1.1.1.1. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
97 lines
2.8 KiB
Python
97 lines
2.8 KiB
Python
"""Plugin: DNS record lookup over TCP (SOCKS5-proxied)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import ipaddress
|
|
import struct
|
|
|
|
from derp.dns import (
|
|
QTYPES,
|
|
RCODES,
|
|
build_query,
|
|
parse_response,
|
|
reverse_name,
|
|
)
|
|
from derp.http import open_connection as _open_connection
|
|
from derp.plugin import command
|
|
|
|
_DEFAULT_SERVER = "1.1.1.1"
|
|
_TIMEOUT = 5.0
|
|
|
|
|
|
async def _query_tcp(name: str, qtype: int, server: str,
|
|
timeout: float = _TIMEOUT) -> tuple[int, list[str]]:
|
|
"""Send a DNS query over TCP and return (rcode, [values])."""
|
|
reader, writer = await asyncio.wait_for(
|
|
_open_connection(server, 53, timeout=timeout), timeout=timeout,
|
|
)
|
|
try:
|
|
pkt = build_query(name, qtype)
|
|
writer.write(struct.pack("!H", len(pkt)) + pkt)
|
|
await writer.drain()
|
|
length = struct.unpack("!H", await reader.readexactly(2))[0]
|
|
data = await reader.readexactly(length)
|
|
return parse_response(data)
|
|
finally:
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
|
|
|
|
@command("tdns", help="TCP DNS lookup: !tdns <target> [type] [@server]")
|
|
async def cmd_tdns(bot, message):
|
|
"""Query DNS records over TCP (routed through SOCKS5 proxy)."""
|
|
parts = message.text.split()
|
|
if len(parts) < 2:
|
|
await bot.reply(message, "Usage: !tdns <domain|ip> [type] [@server]")
|
|
return
|
|
|
|
target = parts[1]
|
|
qtype_str = None
|
|
server = _DEFAULT_SERVER
|
|
|
|
for arg in parts[2:]:
|
|
if arg.startswith("@"):
|
|
server = arg[1:]
|
|
elif qtype_str is None:
|
|
qtype_str = arg.upper()
|
|
|
|
# Auto-detect: IP -> PTR, domain -> A
|
|
if qtype_str is None:
|
|
try:
|
|
ipaddress.ip_address(target)
|
|
qtype_str = "PTR"
|
|
except ValueError:
|
|
qtype_str = "A"
|
|
|
|
qtype = QTYPES.get(qtype_str)
|
|
if qtype is None:
|
|
valid = ", ".join(sorted(QTYPES))
|
|
await bot.reply(message, f"Unknown type: {qtype_str} (valid: {valid})")
|
|
return
|
|
|
|
lookup = target
|
|
if qtype_str == "PTR":
|
|
try:
|
|
lookup = reverse_name(target)
|
|
except ValueError:
|
|
await bot.reply(message, f"Invalid IP for PTR: {target}")
|
|
return
|
|
|
|
try:
|
|
rcode, results = await _query_tcp(lookup, qtype, server)
|
|
except (TimeoutError, asyncio.TimeoutError):
|
|
await bot.reply(message, f"{target} {qtype_str}: timeout")
|
|
return
|
|
except OSError as exc:
|
|
await bot.reply(message, f"{target} {qtype_str}: connection error: {exc}")
|
|
return
|
|
|
|
if rcode != 0:
|
|
err = RCODES.get(rcode, f"error {rcode}")
|
|
await bot.reply(message, f"{target} {qtype_str}: {err}")
|
|
elif not results:
|
|
await bot.reply(message, f"{target} {qtype_str}: no records")
|
|
else:
|
|
await bot.reply(message, f"{target} {qtype_str}: {', '.join(results)}")
|