Add Bot.long_reply() that sends lines directly when under threshold, or creates a FlaskPaste paste with preview + link when over. Refactor abuseipdb, alert history, crtsh, dork, exploitdb, and subdomain plugins to use long_reply(). Configurable paste_threshold (default: 4). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
186 lines
6.1 KiB
Python
186 lines
6.1 KiB
Python
"""Plugin: crt.sh certificate transparency lookup.
|
|
|
|
Query CT logs to enumerate SSL certificates for domains, report
|
|
totals (expired/valid), and flag domains still serving expired certs.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import ssl
|
|
import urllib.request
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from datetime import datetime, timezone
|
|
|
|
from derp.http import create_connection as _create_connection
|
|
from derp.http import urlopen as _urlopen
|
|
from derp.plugin import command
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# Shared pool -- 3 workers keeps RPi5 happy while allowing meaningful
|
|
# parallelism (each crt.sh request blocks for seconds).
|
|
_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="crtsh")
|
|
|
|
_CRTSH_URL = "https://crt.sh/?q=%25.{domain}&output=json"
|
|
_CRTSH_TIMEOUT = 30
|
|
_MAX_DOMAINS = 5
|
|
|
|
|
|
# -- blocking helpers (run in thread pool) ---------------------------------
|
|
|
|
def fetch_crtsh(domain: str) -> list[dict]:
|
|
"""GET crt.sh JSON for a domain. Blocking."""
|
|
url = _CRTSH_URL.format(domain=domain)
|
|
req = urllib.request.Request(url, headers={"User-Agent": "derp-irc-bot"})
|
|
with _urlopen(req, timeout=_CRTSH_TIMEOUT) as resp:
|
|
return json.loads(resp.read())
|
|
|
|
|
|
def check_live_cert(domain: str) -> dict | None:
|
|
"""Connect to domain:443, return cert dict or None on failure."""
|
|
# Try verified first, fall back to unverified (expired cert is the point)
|
|
for ctx_factory in (_make_verified_ctx, _make_unverified_ctx):
|
|
ctx = ctx_factory()
|
|
try:
|
|
with _create_connection((domain, 443), timeout=10) as sock:
|
|
with ctx.wrap_socket(sock, server_hostname=domain) as ssock:
|
|
return ssock.getpeercert()
|
|
except (OSError, ssl.SSLError):
|
|
continue
|
|
return None
|
|
|
|
|
|
def _make_verified_ctx() -> ssl.SSLContext:
|
|
return ssl.create_default_context()
|
|
|
|
|
|
def _make_unverified_ctx() -> ssl.SSLContext:
|
|
ctx = ssl.create_default_context()
|
|
ctx.check_hostname = False
|
|
ctx.verify_mode = ssl.CERT_NONE
|
|
return ctx
|
|
|
|
|
|
# -- pure helpers -----------------------------------------------------------
|
|
|
|
def deduplicate(certs: list[dict]) -> list[dict]:
|
|
"""Deduplicate certificates by serial_number."""
|
|
seen: dict[str, dict] = {}
|
|
for cert in certs:
|
|
serial = cert.get("serial_number", "")
|
|
if serial and serial not in seen:
|
|
seen[serial] = cert
|
|
return list(seen.values())
|
|
|
|
|
|
def parse_crtsh_ts(ts: str) -> datetime:
|
|
"""Parse crt.sh timestamp to timezone-aware datetime.
|
|
|
|
Handles 'YYYY-MM-DDTHH:MM:SS' and 'YYYY-MM-DDTHH:MM:SS.fff'
|
|
formats, both without trailing Z.
|
|
"""
|
|
ts = ts.strip()
|
|
for fmt in ("%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%S.%f"):
|
|
try:
|
|
return datetime.strptime(ts, fmt).replace(tzinfo=timezone.utc)
|
|
except ValueError:
|
|
continue
|
|
raise ValueError(f"unrecognised timestamp: {ts}")
|
|
|
|
|
|
def is_expired(cert: dict) -> bool:
|
|
"""Check if a crt.sh certificate entry is expired."""
|
|
not_after = cert.get("not_after", "")
|
|
if not not_after:
|
|
return False
|
|
try:
|
|
return parse_crtsh_ts(not_after) < datetime.now(timezone.utc)
|
|
except ValueError:
|
|
return False
|
|
|
|
|
|
def is_live_cert_expired(cert_dict: dict) -> bool:
|
|
"""Check if a live SSL cert (from getpeercert()) is expired."""
|
|
not_after = cert_dict.get("notAfter", "")
|
|
if not not_after:
|
|
return False
|
|
try:
|
|
# Format: 'Mon DD HH:MM:SS YYYY GMT'
|
|
expiry = datetime.strptime(not_after, "%b %d %H:%M:%S %Y %Z")
|
|
expiry = expiry.replace(tzinfo=timezone.utc)
|
|
return expiry < datetime.now(timezone.utc)
|
|
except ValueError:
|
|
return False
|
|
|
|
|
|
def format_result(domain: str, total: int, expired_count: int,
|
|
valid_count: int, live_expired: bool | None) -> str:
|
|
"""Format a single domain result as one IRC line."""
|
|
line = f"{domain} -- {total} certs ({expired_count} expired, {valid_count} valid)"
|
|
if live_expired is True:
|
|
line += " | live cert EXPIRED"
|
|
elif live_expired is False and expired_count > 0:
|
|
line += " | live cert ok"
|
|
return line
|
|
|
|
|
|
# -- async orchestration ----------------------------------------------------
|
|
|
|
async def analyze_domain(domain: str) -> str:
|
|
"""Full pipeline for one domain: fetch, dedup, analyze, live check."""
|
|
loop = asyncio.get_running_loop()
|
|
|
|
# Fetch crt.sh data
|
|
try:
|
|
raw = await asyncio.wait_for(
|
|
loop.run_in_executor(_pool, fetch_crtsh, domain),
|
|
timeout=35.0,
|
|
)
|
|
except TimeoutError:
|
|
return f"{domain} -- error: timeout"
|
|
except Exception as exc:
|
|
reason = str(exc)[:80] if str(exc) else type(exc).__name__
|
|
return f"{domain} -- error: {reason}"
|
|
|
|
if not raw:
|
|
return f"{domain} -- 0 certs"
|
|
|
|
# Dedup and classify
|
|
unique = deduplicate(raw)
|
|
total = len(unique)
|
|
expired_certs = [c for c in unique if is_expired(c)]
|
|
expired_count = len(expired_certs)
|
|
valid_count = total - expired_count
|
|
|
|
# Live cert check (only if there are expired certs to flag)
|
|
live_expired: bool | None = None
|
|
if expired_count > 0:
|
|
try:
|
|
cert_dict = await asyncio.wait_for(
|
|
loop.run_in_executor(_pool, check_live_cert, domain),
|
|
timeout=15.0,
|
|
)
|
|
if cert_dict is not None:
|
|
live_expired = is_live_cert_expired(cert_dict)
|
|
except (TimeoutError, Exception):
|
|
pass # Skip live check silently
|
|
|
|
return format_result(domain, total, expired_count, valid_count, live_expired)
|
|
|
|
|
|
@command("cert", help="Lookup CT logs for domain(s): !cert <domain> [domain2 ...]")
|
|
async def cmd_cert(bot, message):
|
|
"""Query crt.sh for certificate transparency data."""
|
|
parts = message.text.split()
|
|
domains = parts[1:_MAX_DOMAINS + 1]
|
|
|
|
if not domains:
|
|
await bot.reply(message, f"Usage: !cert <domain> [domain2 ...] (max {_MAX_DOMAINS})")
|
|
return
|
|
|
|
await bot.reply(message, f"Querying crt.sh for {len(domains)} domain(s)...")
|
|
|
|
results = await asyncio.gather(*[analyze_domain(d) for d in domains])
|
|
await bot.long_reply(message, list(results), label="certs")
|