"""Plugin: subdomain enumeration via crt.sh + DNS brute force.""" from __future__ import annotations import asyncio import ipaddress import json import logging import os import re import socket import struct import urllib.request from derp.plugin import command log = logging.getLogger(__name__) _CRTSH_URL = "https://crt.sh/?q=%25.{domain}&output=json" _CRTSH_TIMEOUT = 30 _DNS_TIMEOUT = 3.0 _MAX_RESULTS = 20 # Built-in wordlist for brute force (common subdomain prefixes) _WORDLIST = [ "www", "mail", "ftp", "smtp", "pop", "imap", "webmail", "mx", "ns1", "ns2", "ns3", "dns", "dns1", "dns2", "dev", "staging", "stage", "test", "qa", "uat", "beta", "demo", "api", "app", "web", "portal", "admin", "panel", "dashboard", "vpn", "remote", "gateway", "proxy", "cdn", "static", "assets", "media", "img", "images", "files", "download", "upload", "db", "database", "mysql", "postgres", "redis", "mongo", "elastic", "git", "svn", "repo", "ci", "jenkins", "build", "ldap", "ad", "auth", "sso", "login", "id", "accounts", "docs", "wiki", "help", "support", "status", "monitor", "backup", "bak", "old", "new", "internal", "intranet", "corp", "shop", "store", "blog", "forum", "crm", "erp", "cloud", "aws", "s3", "gcp", "azure", "mx1", "mx2", "relay", "smtp2", "autodiscover", ] _DOMAIN_RE = re.compile(r"^[a-zA-Z0-9]([a-zA-Z0-9-]*[a-zA-Z0-9])?(\.[a-zA-Z]{2,})+$") def _get_resolver() -> str: """Read first IPv4 nameserver from /etc/resolv.conf.""" try: with open("/etc/resolv.conf") as f: for line in f: line = line.strip() if line.startswith("nameserver"): addr = line.split()[1] try: ipaddress.IPv4Address(addr) return addr except ValueError: continue except (OSError, IndexError): pass return "8.8.8.8" def _build_a_query(name: str) -> bytes: """Build a minimal DNS A query.""" tid = os.urandom(2) flags = struct.pack("!H", 0x0100) counts = struct.pack("!HHHH", 1, 0, 0, 0) encoded = b"" for label in name.rstrip(".").split("."): encoded += bytes([len(label)]) + label.encode("ascii") encoded += b"\x00" return tid + flags + counts + encoded + struct.pack("!HH", 1, 1) def _parse_a_response(data: bytes) -> list[str]: """Extract A record IPs from a DNS response.""" if len(data) < 12: return [] _, flags, _, ancount = struct.unpack_from("!HHHH", data, 0) rcode = flags & 0x0F if rcode != 0 or ancount == 0: return [] offset = 12 # Skip question section while offset < len(data) and data[offset] != 0: if (data[offset] & 0xC0) == 0xC0: offset += 2 break offset += data[offset] + 1 else: offset += 1 offset += 4 # QTYPE + QCLASS results = [] for _ in range(ancount): if offset + 12 > len(data): break # Skip name (may be pointer) if (data[offset] & 0xC0) == 0xC0: offset += 2 else: while offset < len(data) and data[offset] != 0: offset += data[offset] + 1 offset += 1 rtype, _, _, rdlength = struct.unpack_from("!HHIH", data, offset) offset += 10 if rtype == 1 and rdlength == 4: results.append(socket.inet_ntoa(data[offset:offset + 4])) offset += rdlength return results def _resolve_a(name: str, server: str) -> list[str]: """Blocking DNS A lookup. Returns list of IPs.""" query = _build_a_query(name) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.settimeout(_DNS_TIMEOUT) try: sock.sendto(query, (server, 53)) data = sock.recv(4096) return _parse_a_response(data) except (socket.timeout, OSError): return [] finally: sock.close() def _fetch_crtsh(domain: str) -> set[str]: """Fetch subdomains from crt.sh CT logs. Blocking.""" url = _CRTSH_URL.format(domain=domain) req = urllib.request.Request(url, headers={"User-Agent": "derp-bot"}) with urllib.request.urlopen(req, timeout=_CRTSH_TIMEOUT) as resp: # noqa: S310 data = json.loads(resp.read()) subs: set[str] = set() for entry in data: name = entry.get("common_name", "").strip().lower() if name and name.endswith(f".{domain}") and "*" not in name: subs.add(name) # Also check SAN entries name_value = entry.get("name_value", "") for line in name_value.split("\n"): line = line.strip().lower() if line and line.endswith(f".{domain}") and "*" not in line: subs.add(line) return subs async def _brute_one(prefix: str, domain: str, server: str) -> tuple[str, list[str]]: """Resolve one subdomain. Returns (fqdn, [ips]).""" fqdn = f"{prefix}.{domain}" loop = asyncio.get_running_loop() ips = await loop.run_in_executor(None, _resolve_a, fqdn, server) return fqdn, ips @command("subdomain", help="Subdomain enum: !subdomain [brute]") async def cmd_subdomain(bot, message): """Enumerate subdomains via CT logs and optional DNS brute force. Usage: !subdomain example.com CT log lookup only !subdomain example.com brute CT + DNS brute force """ parts = message.text.split(None, 3) if len(parts) < 2: await bot.reply(message, "Usage: !subdomain [brute]") return domain = parts[1].lower().rstrip(".") brute = len(parts) > 2 and parts[2].lower() == "brute" if not _DOMAIN_RE.match(domain): await bot.reply(message, f"Invalid domain: {domain}") return await bot.reply(message, f"Enumerating subdomains for {domain}...") loop = asyncio.get_running_loop() found: dict[str, list[str]] = {} # Phase 1: crt.sh CT log lookup try: ct_subs = await asyncio.wait_for( loop.run_in_executor(None, _fetch_crtsh, domain), timeout=35.0, ) # Resolve the CT-discovered subdomains server = _get_resolver() tasks = [_brute_one(sub.removesuffix(f".{domain}"), domain, server) for sub in ct_subs] if tasks: results = await asyncio.gather(*tasks) for fqdn, ips in results: if ips: found[fqdn] = ips except TimeoutError: await bot.reply(message, "crt.sh: timeout (continuing...)") except Exception as exc: reason = str(exc)[:60] if str(exc) else type(exc).__name__ await bot.reply(message, f"crt.sh: {reason} (continuing...)") # Phase 2: DNS brute force (optional) if brute: server = _get_resolver() tasks = [_brute_one(w, domain, server) for w in _WORDLIST if f"{w}.{domain}" not in found] if tasks: results = await asyncio.gather(*tasks) for fqdn, ips in results: if ips: found[fqdn] = ips if not found: await bot.reply(message, f"{domain}: no subdomains found") return # Sort and output sorted_subs = sorted(found.items()) total = len(sorted_subs) shown = sorted_subs[:_MAX_RESULTS] for fqdn, ips in shown: await bot.reply(message, f" {fqdn} -> {', '.join(ips)}") suffix = f" ({total - _MAX_RESULTS} more)" if total > _MAX_RESULTS else "" await bot.reply(message, f"{domain}: {total} subdomains found{suffix}")