"""Plugin: subdomain enumeration via crt.sh + DNS brute force.""" from __future__ import annotations import asyncio import json import logging import re import socket import urllib.request from derp.dns import TOR_DNS_ADDR, TOR_DNS_PORT, build_query, parse_response from derp.http import urlopen as _urlopen from derp.plugin import command log = logging.getLogger(__name__) _CRTSH_URL = "https://crt.sh/?q=%25.{domain}&output=json" _CRTSH_TIMEOUT = 30 _DNS_TIMEOUT = 3.0 _MAX_RESULTS = 20 # Built-in wordlist for brute force (common subdomain prefixes) _WORDLIST = [ "www", "mail", "ftp", "smtp", "pop", "imap", "webmail", "mx", "ns1", "ns2", "ns3", "dns", "dns1", "dns2", "dev", "staging", "stage", "test", "qa", "uat", "beta", "demo", "api", "app", "web", "portal", "admin", "panel", "dashboard", "vpn", "remote", "gateway", "proxy", "cdn", "static", "assets", "media", "img", "images", "files", "download", "upload", "db", "database", "mysql", "postgres", "redis", "mongo", "elastic", "git", "svn", "repo", "ci", "jenkins", "build", "ldap", "ad", "auth", "sso", "login", "id", "accounts", "docs", "wiki", "help", "support", "status", "monitor", "backup", "bak", "old", "new", "internal", "intranet", "corp", "shop", "store", "blog", "forum", "crm", "erp", "cloud", "aws", "s3", "gcp", "azure", "mx1", "mx2", "relay", "smtp2", "autodiscover", ] _DOMAIN_RE = re.compile(r"^[a-zA-Z0-9]([a-zA-Z0-9-]*[a-zA-Z0-9])?(\.[a-zA-Z]{2,})+$") def _resolve_a(name: str) -> list[str]: """Blocking DNS A lookup via Tor resolver. Returns list of IPs.""" query = build_query(name, 1) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.settimeout(_DNS_TIMEOUT) try: sock.sendto(query, (TOR_DNS_ADDR, TOR_DNS_PORT)) data = sock.recv(4096) rcode, results = parse_response(data) if rcode != 0: return [] # Filter to only A record IPs (dotted-quad format) return [r for r in results if "." in r and ":" not in r] except (socket.timeout, OSError): return [] finally: sock.close() def _fetch_crtsh(domain: str) -> set[str]: """Fetch subdomains from crt.sh CT logs. Blocking.""" url = _CRTSH_URL.format(domain=domain) req = urllib.request.Request(url, headers={"User-Agent": "derp-bot"}) with _urlopen(req, timeout=_CRTSH_TIMEOUT) as resp: data = json.loads(resp.read()) subs: set[str] = set() for entry in data: name = entry.get("common_name", "").strip().lower() if name and name.endswith(f".{domain}") and "*" not in name: subs.add(name) # Also check SAN entries name_value = entry.get("name_value", "") for line in name_value.split("\n"): line = line.strip().lower() if line and line.endswith(f".{domain}") and "*" not in line: subs.add(line) return subs async def _brute_one(prefix: str, domain: str) -> tuple[str, list[str]]: """Resolve one subdomain. Returns (fqdn, [ips]).""" fqdn = f"{prefix}.{domain}" loop = asyncio.get_running_loop() ips = await loop.run_in_executor(None, _resolve_a, fqdn) return fqdn, ips @command("subdomain", help="Subdomain enum: !subdomain [brute]") async def cmd_subdomain(bot, message): """Enumerate subdomains via CT logs and optional DNS brute force. Usage: !subdomain example.com CT log lookup only !subdomain example.com brute CT + DNS brute force """ parts = message.text.split(None, 3) if len(parts) < 2: await bot.reply(message, "Usage: !subdomain [brute]") return domain = parts[1].lower().rstrip(".") brute = len(parts) > 2 and parts[2].lower() == "brute" if not _DOMAIN_RE.match(domain): await bot.reply(message, f"Invalid domain: {domain}") return await bot.reply(message, f"Enumerating subdomains for {domain}...") loop = asyncio.get_running_loop() found: dict[str, list[str]] = {} # Phase 1: crt.sh CT log lookup try: ct_subs = await asyncio.wait_for( loop.run_in_executor(None, _fetch_crtsh, domain), timeout=35.0, ) # Resolve the CT-discovered subdomains tasks = [_brute_one(sub.removesuffix(f".{domain}"), domain) for sub in ct_subs] if tasks: results = await asyncio.gather(*tasks) for fqdn, ips in results: if ips: found[fqdn] = ips except TimeoutError: await bot.reply(message, "crt.sh: timeout (continuing...)") except Exception as exc: reason = str(exc)[:60] if str(exc) else type(exc).__name__ await bot.reply(message, f"crt.sh: {reason} (continuing...)") # Phase 2: DNS brute force (optional) if brute: tasks = [_brute_one(w, domain) for w in _WORDLIST if f"{w}.{domain}" not in found] if tasks: results = await asyncio.gather(*tasks) for fqdn, ips in results: if ips: found[fqdn] = ips if not found: await bot.reply(message, f"{domain}: no subdomains found") return # Sort and output sorted_subs = sorted(found.items()) total = len(sorted_subs) shown = sorted_subs[:_MAX_RESULTS] lines = [f" {fqdn} -> {', '.join(ips)}" for fqdn, ips in shown] suffix = f" ({total - _MAX_RESULTS} more)" if total > _MAX_RESULTS else "" lines.append(f"{domain}: {total} subdomains found{suffix}") await bot.long_reply(message, lines, label="subdomains")