Files
derp/plugins/subdomain.py
user e1b57e1764 feat: add wave 4 plugins (opslog, note, subdomain, headers)
Opslog: timestamped operational log per channel with add, list,
search, and delete. SQLite-backed, admin-only clear.

Note: persistent per-channel key-value store with set, get, del,
list, clear. SQLite-backed, admin-only clear.

Subdomain: enumeration via crt.sh CT log query with optional DNS
brute force using a built-in 80-word prefix wordlist. Resolves
discovered subdomains concurrently.

Headers: HTTP header fingerprinting against 50+ signature patterns.
Detects servers, frameworks, CDNs, and security headers (HSTS, CSP,
XFO, etc).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 02:48:16 +01:00

230 lines
7.5 KiB
Python

"""Plugin: subdomain enumeration via crt.sh + DNS brute force."""
from __future__ import annotations
import asyncio
import ipaddress
import json
import logging
import os
import re
import socket
import struct
import urllib.request
from derp.plugin import command
log = logging.getLogger(__name__)
_CRTSH_URL = "https://crt.sh/?q=%25.{domain}&output=json"
_CRTSH_TIMEOUT = 30
_DNS_TIMEOUT = 3.0
_MAX_RESULTS = 20
# Built-in wordlist for brute force (common subdomain prefixes)
_WORDLIST = [
"www", "mail", "ftp", "smtp", "pop", "imap", "webmail", "mx",
"ns1", "ns2", "ns3", "dns", "dns1", "dns2",
"dev", "staging", "stage", "test", "qa", "uat", "beta", "demo",
"api", "app", "web", "portal", "admin", "panel", "dashboard",
"vpn", "remote", "gateway", "proxy", "cdn", "static", "assets",
"media", "img", "images", "files", "download", "upload",
"db", "database", "mysql", "postgres", "redis", "mongo", "elastic",
"git", "svn", "repo", "ci", "jenkins", "build",
"ldap", "ad", "auth", "sso", "login", "id", "accounts",
"docs", "wiki", "help", "support", "status", "monitor",
"backup", "bak", "old", "new", "internal", "intranet", "corp",
"shop", "store", "blog", "forum", "crm", "erp",
"cloud", "aws", "s3", "gcp", "azure",
"mx1", "mx2", "relay", "smtp2", "autodiscover",
]
_DOMAIN_RE = re.compile(r"^[a-zA-Z0-9]([a-zA-Z0-9-]*[a-zA-Z0-9])?(\.[a-zA-Z]{2,})+$")
def _get_resolver() -> str:
"""Read first IPv4 nameserver from /etc/resolv.conf."""
try:
with open("/etc/resolv.conf") as f:
for line in f:
line = line.strip()
if line.startswith("nameserver"):
addr = line.split()[1]
try:
ipaddress.IPv4Address(addr)
return addr
except ValueError:
continue
except (OSError, IndexError):
pass
return "8.8.8.8"
def _build_a_query(name: str) -> bytes:
"""Build a minimal DNS A query."""
tid = os.urandom(2)
flags = struct.pack("!H", 0x0100)
counts = struct.pack("!HHHH", 1, 0, 0, 0)
encoded = b""
for label in name.rstrip(".").split("."):
encoded += bytes([len(label)]) + label.encode("ascii")
encoded += b"\x00"
return tid + flags + counts + encoded + struct.pack("!HH", 1, 1)
def _parse_a_response(data: bytes) -> list[str]:
"""Extract A record IPs from a DNS response."""
if len(data) < 12:
return []
_, flags, _, ancount = struct.unpack_from("!HHHH", data, 0)
rcode = flags & 0x0F
if rcode != 0 or ancount == 0:
return []
offset = 12
# Skip question section
while offset < len(data) and data[offset] != 0:
if (data[offset] & 0xC0) == 0xC0:
offset += 2
break
offset += data[offset] + 1
else:
offset += 1
offset += 4 # QTYPE + QCLASS
results = []
for _ in range(ancount):
if offset + 12 > len(data):
break
# Skip name (may be pointer)
if (data[offset] & 0xC0) == 0xC0:
offset += 2
else:
while offset < len(data) and data[offset] != 0:
offset += data[offset] + 1
offset += 1
rtype, _, _, rdlength = struct.unpack_from("!HHIH", data, offset)
offset += 10
if rtype == 1 and rdlength == 4:
results.append(socket.inet_ntoa(data[offset:offset + 4]))
offset += rdlength
return results
def _resolve_a(name: str, server: str) -> list[str]:
"""Blocking DNS A lookup. Returns list of IPs."""
query = _build_a_query(name)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(_DNS_TIMEOUT)
try:
sock.sendto(query, (server, 53))
data = sock.recv(4096)
return _parse_a_response(data)
except (socket.timeout, OSError):
return []
finally:
sock.close()
def _fetch_crtsh(domain: str) -> set[str]:
"""Fetch subdomains from crt.sh CT logs. Blocking."""
url = _CRTSH_URL.format(domain=domain)
req = urllib.request.Request(url, headers={"User-Agent": "derp-bot"})
with urllib.request.urlopen(req, timeout=_CRTSH_TIMEOUT) as resp: # noqa: S310
data = json.loads(resp.read())
subs: set[str] = set()
for entry in data:
name = entry.get("common_name", "").strip().lower()
if name and name.endswith(f".{domain}") and "*" not in name:
subs.add(name)
# Also check SAN entries
name_value = entry.get("name_value", "")
for line in name_value.split("\n"):
line = line.strip().lower()
if line and line.endswith(f".{domain}") and "*" not in line:
subs.add(line)
return subs
async def _brute_one(prefix: str, domain: str,
server: str) -> tuple[str, list[str]]:
"""Resolve one subdomain. Returns (fqdn, [ips])."""
fqdn = f"{prefix}.{domain}"
loop = asyncio.get_running_loop()
ips = await loop.run_in_executor(None, _resolve_a, fqdn, server)
return fqdn, ips
@command("subdomain", help="Subdomain enum: !subdomain <domain> [brute]")
async def cmd_subdomain(bot, message):
"""Enumerate subdomains via CT logs and optional DNS brute force.
Usage:
!subdomain example.com CT log lookup only
!subdomain example.com brute CT + DNS brute force
"""
parts = message.text.split(None, 3)
if len(parts) < 2:
await bot.reply(message, "Usage: !subdomain <domain> [brute]")
return
domain = parts[1].lower().rstrip(".")
brute = len(parts) > 2 and parts[2].lower() == "brute"
if not _DOMAIN_RE.match(domain):
await bot.reply(message, f"Invalid domain: {domain}")
return
await bot.reply(message, f"Enumerating subdomains for {domain}...")
loop = asyncio.get_running_loop()
found: dict[str, list[str]] = {}
# Phase 1: crt.sh CT log lookup
try:
ct_subs = await asyncio.wait_for(
loop.run_in_executor(None, _fetch_crtsh, domain),
timeout=35.0,
)
# Resolve the CT-discovered subdomains
server = _get_resolver()
tasks = [_brute_one(sub.removesuffix(f".{domain}"), domain, server)
for sub in ct_subs]
if tasks:
results = await asyncio.gather(*tasks)
for fqdn, ips in results:
if ips:
found[fqdn] = ips
except TimeoutError:
await bot.reply(message, "crt.sh: timeout (continuing...)")
except Exception as exc:
reason = str(exc)[:60] if str(exc) else type(exc).__name__
await bot.reply(message, f"crt.sh: {reason} (continuing...)")
# Phase 2: DNS brute force (optional)
if brute:
server = _get_resolver()
tasks = [_brute_one(w, domain, server) for w in _WORDLIST
if f"{w}.{domain}" not in found]
if tasks:
results = await asyncio.gather(*tasks)
for fqdn, ips in results:
if ips:
found[fqdn] = ips
if not found:
await bot.reply(message, f"{domain}: no subdomains found")
return
# Sort and output
sorted_subs = sorted(found.items())
total = len(sorted_subs)
shown = sorted_subs[:_MAX_RESULTS]
for fqdn, ips in shown:
await bot.reply(message, f" {fqdn} -> {', '.join(ips)}")
suffix = f" ({total - _MAX_RESULTS} more)" if total > _MAX_RESULTS else ""
await bot.reply(message, f"{domain}: {total} subdomains found{suffix}")