Files
derp/plugins/crtsh.py
user 1836fa50af feat: paste overflow via FlaskPaste for long replies
Add Bot.long_reply() that sends lines directly when under threshold,
or creates a FlaskPaste paste with preview + link when over. Refactor
abuseipdb, alert history, crtsh, dork, exploitdb, and subdomain
plugins to use long_reply(). Configurable paste_threshold (default: 4).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 22:07:31 +01:00

186 lines
6.1 KiB
Python

"""Plugin: crt.sh certificate transparency lookup.
Query CT logs to enumerate SSL certificates for domains, report
totals (expired/valid), and flag domains still serving expired certs.
"""
import asyncio
import json
import logging
import ssl
import urllib.request
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
from derp.http import create_connection as _create_connection
from derp.http import urlopen as _urlopen
from derp.plugin import command
log = logging.getLogger(__name__)
# Shared pool -- 3 workers keeps RPi5 happy while allowing meaningful
# parallelism (each crt.sh request blocks for seconds).
_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="crtsh")
_CRTSH_URL = "https://crt.sh/?q=%25.{domain}&output=json"
_CRTSH_TIMEOUT = 30
_MAX_DOMAINS = 5
# -- blocking helpers (run in thread pool) ---------------------------------
def fetch_crtsh(domain: str) -> list[dict]:
"""GET crt.sh JSON for a domain. Blocking."""
url = _CRTSH_URL.format(domain=domain)
req = urllib.request.Request(url, headers={"User-Agent": "derp-irc-bot"})
with _urlopen(req, timeout=_CRTSH_TIMEOUT) as resp:
return json.loads(resp.read())
def check_live_cert(domain: str) -> dict | None:
"""Connect to domain:443, return cert dict or None on failure."""
# Try verified first, fall back to unverified (expired cert is the point)
for ctx_factory in (_make_verified_ctx, _make_unverified_ctx):
ctx = ctx_factory()
try:
with _create_connection((domain, 443), timeout=10) as sock:
with ctx.wrap_socket(sock, server_hostname=domain) as ssock:
return ssock.getpeercert()
except (OSError, ssl.SSLError):
continue
return None
def _make_verified_ctx() -> ssl.SSLContext:
return ssl.create_default_context()
def _make_unverified_ctx() -> ssl.SSLContext:
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
return ctx
# -- pure helpers -----------------------------------------------------------
def deduplicate(certs: list[dict]) -> list[dict]:
"""Deduplicate certificates by serial_number."""
seen: dict[str, dict] = {}
for cert in certs:
serial = cert.get("serial_number", "")
if serial and serial not in seen:
seen[serial] = cert
return list(seen.values())
def parse_crtsh_ts(ts: str) -> datetime:
"""Parse crt.sh timestamp to timezone-aware datetime.
Handles 'YYYY-MM-DDTHH:MM:SS' and 'YYYY-MM-DDTHH:MM:SS.fff'
formats, both without trailing Z.
"""
ts = ts.strip()
for fmt in ("%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%S.%f"):
try:
return datetime.strptime(ts, fmt).replace(tzinfo=timezone.utc)
except ValueError:
continue
raise ValueError(f"unrecognised timestamp: {ts}")
def is_expired(cert: dict) -> bool:
"""Check if a crt.sh certificate entry is expired."""
not_after = cert.get("not_after", "")
if not not_after:
return False
try:
return parse_crtsh_ts(not_after) < datetime.now(timezone.utc)
except ValueError:
return False
def is_live_cert_expired(cert_dict: dict) -> bool:
"""Check if a live SSL cert (from getpeercert()) is expired."""
not_after = cert_dict.get("notAfter", "")
if not not_after:
return False
try:
# Format: 'Mon DD HH:MM:SS YYYY GMT'
expiry = datetime.strptime(not_after, "%b %d %H:%M:%S %Y %Z")
expiry = expiry.replace(tzinfo=timezone.utc)
return expiry < datetime.now(timezone.utc)
except ValueError:
return False
def format_result(domain: str, total: int, expired_count: int,
valid_count: int, live_expired: bool | None) -> str:
"""Format a single domain result as one IRC line."""
line = f"{domain} -- {total} certs ({expired_count} expired, {valid_count} valid)"
if live_expired is True:
line += " | live cert EXPIRED"
elif live_expired is False and expired_count > 0:
line += " | live cert ok"
return line
# -- async orchestration ----------------------------------------------------
async def analyze_domain(domain: str) -> str:
"""Full pipeline for one domain: fetch, dedup, analyze, live check."""
loop = asyncio.get_running_loop()
# Fetch crt.sh data
try:
raw = await asyncio.wait_for(
loop.run_in_executor(_pool, fetch_crtsh, domain),
timeout=35.0,
)
except TimeoutError:
return f"{domain} -- error: timeout"
except Exception as exc:
reason = str(exc)[:80] if str(exc) else type(exc).__name__
return f"{domain} -- error: {reason}"
if not raw:
return f"{domain} -- 0 certs"
# Dedup and classify
unique = deduplicate(raw)
total = len(unique)
expired_certs = [c for c in unique if is_expired(c)]
expired_count = len(expired_certs)
valid_count = total - expired_count
# Live cert check (only if there are expired certs to flag)
live_expired: bool | None = None
if expired_count > 0:
try:
cert_dict = await asyncio.wait_for(
loop.run_in_executor(_pool, check_live_cert, domain),
timeout=15.0,
)
if cert_dict is not None:
live_expired = is_live_cert_expired(cert_dict)
except (TimeoutError, Exception):
pass # Skip live check silently
return format_result(domain, total, expired_count, valid_count, live_expired)
@command("cert", help="Lookup CT logs for domain(s): !cert <domain> [domain2 ...]")
async def cmd_cert(bot, message):
"""Query crt.sh for certificate transparency data."""
parts = message.text.split()
domains = parts[1:_MAX_DOMAINS + 1]
if not domains:
await bot.reply(message, f"Usage: !cert <domain> [domain2 ...] (max {_MAX_DOMAINS})")
return
await bot.reply(message, f"Querying crt.sh for {len(domains)} domain(s)...")
results = await asyncio.gather(*[analyze_domain(d) for d in domains])
await bot.long_reply(message, list(results), label="certs")