Files
derp/plugins/headers.py
user e1b57e1764 feat: add wave 4 plugins (opslog, note, subdomain, headers)
Opslog: timestamped operational log per channel with add, list,
search, and delete. SQLite-backed, admin-only clear.

Note: persistent per-channel key-value store with set, get, del,
list, clear. SQLite-backed, admin-only clear.

Subdomain: enumeration via crt.sh CT log query with optional DNS
brute force using a built-in 80-word prefix wordlist. Resolves
discovered subdomains concurrently.

Headers: HTTP header fingerprinting against 50+ signature patterns.
Detects servers, frameworks, CDNs, and security headers (HSTS, CSP,
XFO, etc).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 02:48:16 +01:00

184 lines
5.8 KiB
Python

"""Plugin: HTTP header fingerprinting with local signature patterns."""
from __future__ import annotations
import asyncio
import logging
import re
import ssl
import urllib.request
from derp.plugin import command
log = logging.getLogger(__name__)
_TIMEOUT = 10
_USER_AGENT = "Mozilla/5.0 (compatible; derp-bot/1.0)"
# -- Signature database -------------------------------------------------------
# Each entry: (header_name, pattern_regex, technology_label)
# Patterns are case-insensitive.
_SIGNATURES: list[tuple[str, str, str]] = [
# Web servers
("Server", r"Apache/?(\S+)?", "Apache {0}"),
("Server", r"nginx/?(\S+)?", "nginx {0}"),
("Server", r"Microsoft-IIS/?(\S+)?", "IIS {0}"),
("Server", r"LiteSpeed", "LiteSpeed"),
("Server", r"Caddy", "Caddy"),
("Server", r"openresty/?(\S+)?", "OpenResty {0}"),
("Server", r"Cowboy", "Cowboy (Erlang)"),
("Server", r"gunicorn/?(\S+)?", "Gunicorn {0}"),
("Server", r"uvicorn", "Uvicorn"),
("Server", r"Werkzeug/?(\S+)?", "Werkzeug {0}"),
("Server", r"Kestrel", "Kestrel (.NET)"),
("Server", r"Jetty", "Jetty (Java)"),
# Frameworks / languages
("X-Powered-By", r"PHP/?(\S+)?", "PHP {0}"),
("X-Powered-By", r"ASP\.NET", "ASP.NET"),
("X-Powered-By", r"Express", "Express (Node.js)"),
("X-Powered-By", r"Next\.js", "Next.js"),
("X-Powered-By", r"Phusion Passenger", "Passenger"),
("X-Powered-By", r"Django", "Django"),
("X-Powered-By", r"Flask", "Flask"),
("X-AspNet-Version", r"(\S+)", "ASP.NET {0}"),
("X-Drupal-Cache", r".*", "Drupal"),
("X-Generator", r"WordPress", "WordPress"),
("X-Generator", r"Drupal", "Drupal"),
("X-Shopify-Stage", r".*", "Shopify"),
("X-Wix-Request-Id", r".*", "Wix"),
# CDN / proxy
("CF-RAY", r".*", "Cloudflare"),
("X-Cache", r".*cloudfront.*", "CloudFront"),
("X-Served-By", r".*cache.*", "Fastly/Varnish"),
("X-Varnish", r".*", "Varnish"),
("Via", r".*varnish.*", "Varnish"),
("Via", r".*cloudfront.*", "CloudFront"),
("X-Akamai-Transformed", r".*", "Akamai"),
("X-Azure-Ref", r".*", "Azure CDN"),
("X-Vercel-Id", r".*", "Vercel"),
("X-Netlify-Request-Id", r".*", "Netlify"),
("Fly-Request-Id", r".*", "Fly.io"),
# Security headers (presence check)
("Strict-Transport-Security", r".*", "HSTS"),
("Content-Security-Policy", r".*", "CSP"),
("X-Content-Type-Options", r"nosniff", "X-CTO"),
("X-Frame-Options", r".*", "XFO"),
("X-XSS-Protection", r".*", "X-XSS"),
("Permissions-Policy", r".*", "Permissions-Policy"),
("Referrer-Policy", r".*", "Referrer-Policy"),
]
# Compile patterns once
_COMPILED: list[tuple[str, re.Pattern, str]] = []
for _hdr, _pat, _label in _SIGNATURES:
_COMPILED.append((_hdr.lower(), re.compile(_pat, re.IGNORECASE), _label))
def _fetch_headers(url: str) -> tuple[dict[str, str], str]:
"""Blocking HEAD request. Returns (headers_dict, error_str)."""
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
opener = urllib.request.build_opener(
urllib.request.HTTPSHandler(context=ctx),
)
req = urllib.request.Request(url, method="GET")
req.add_header("User-Agent", _USER_AGENT)
try:
resp = opener.open(req, timeout=_TIMEOUT)
hdrs = {k.lower(): v for k, v in resp.headers.items()}
resp.close()
return hdrs, ""
except urllib.error.HTTPError as exc:
hdrs = {k.lower(): v for k, v in exc.headers.items()}
return hdrs, ""
except Exception as exc:
return {}, str(exc)[:100]
def _fingerprint(headers: dict[str, str]) -> tuple[list[str], list[str]]:
"""Match headers against signature database.
Returns (tech_list, security_list).
"""
tech: list[str] = []
security: list[str] = []
seen: set[str] = set()
for hdr_lower, pattern, label_fmt in _COMPILED:
value = headers.get(hdr_lower, "")
if not value:
continue
m = pattern.search(value)
if not m:
continue
# Format label with captured groups
groups = m.groups()
label = label_fmt
for i, g in enumerate(groups):
label = label.replace(f"{{{i}}}", g or "")
label = label.strip()
if label in seen:
continue
seen.add(label)
# Categorize: security headers vs tech
if hdr_lower in ("strict-transport-security", "content-security-policy",
"x-content-type-options", "x-frame-options",
"x-xss-protection", "permissions-policy",
"referrer-policy"):
security.append(label)
else:
tech.append(label)
return tech, security
@command("headers", help="HTTP fingerprint: !headers <url>")
async def cmd_headers(bot, message):
"""Fetch HTTP headers and fingerprint server technology.
Usage:
!headers example.com
!headers https://10.0.0.1:8080
"""
parts = message.text.split(None, 2)
if len(parts) < 2:
await bot.reply(message, "Usage: !headers <url>")
return
url = parts[1]
if not url.startswith(("http://", "https://")):
url = f"https://{url}"
loop = asyncio.get_running_loop()
headers, error = await loop.run_in_executor(None, _fetch_headers, url)
if error:
await bot.reply(message, f"{url} -> error: {error}")
return
if not headers:
await bot.reply(message, f"{url} -> no headers received")
return
tech, security = _fingerprint(headers)
out = []
if tech:
out.append(f"Tech: {', '.join(tech)}")
if security:
out.append(f"Security: {', '.join(security)}")
if not out:
out.append("No signatures matched")
await bot.reply(message, f"{url} -> {' | '.join(out)}")