feat: add crt.sh certificate transparency lookup plugin

Query CT logs via crt.sh to enumerate SSL certs for domains,
report expired/valid counts, and flag live expired certs.
Uses ThreadPoolExecutor(3) for blocking I/O on RPi5.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
user
2026-02-15 00:56:55 +01:00
parent 118fbb75d1
commit ad18a902dd
5 changed files with 322 additions and 0 deletions

View File

@@ -12,4 +12,5 @@
| P0 | [x] | Unit tests for parser and plugins |
| P0 | [x] | Documentation |
| P1 | [ ] | Test against live IRC server |
| P1 | [x] | crt.sh CT lookup plugin (`!cert`) |
| P2 | [ ] | SASL authentication |

View File

@@ -20,6 +20,7 @@ derp -v # Verbose/debug mode
!help <cmd> # Command help
!version # Bot version
!echo <text> # Echo text back
!cert <domain> # CT log lookup (max 5 domains)
```
## Plugin Template

View File

@@ -51,6 +51,30 @@ level = "info" # Logging level: debug, info, warning, error
| `!help <cmd>` | Show help for a specific command |
| `!version` | Show bot version |
| `!echo <text>` | Echo back text (example plugin) |
| `!cert <domain> [...]` | Lookup CT logs for up to 5 domains |
### `!cert` -- Certificate Transparency Lookup
Query [crt.sh](https://crt.sh) CT logs to enumerate SSL certificates for
domains. Reports totals (expired/valid) and flags domains still serving
expired certs.
```
!cert example.com
!cert example.com badsite.com another.org
```
Output format:
```
example.com -- 127 certs (23 expired, 104 valid)
badsite.com -- 45 certs (8 expired, 37 valid) | live cert EXPIRED
broken.test -- error: timeout
```
- Max 5 domains per invocation
- crt.sh can be slow; the bot confirms receipt before querying
- Live cert check runs only when expired CT entries exist
## Writing Plugins

186
plugins/crtsh.py Normal file
View File

@@ -0,0 +1,186 @@
"""Plugin: crt.sh certificate transparency lookup.
Query CT logs to enumerate SSL certificates for domains, report
totals (expired/valid), and flag domains still serving expired certs.
"""
import asyncio
import json
import logging
import socket
import ssl
import urllib.request
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
from derp.plugin import command
log = logging.getLogger(__name__)
# Shared pool -- 3 workers keeps RPi5 happy while allowing meaningful
# parallelism (each crt.sh request blocks for seconds).
_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="crtsh")
_CRTSH_URL = "https://crt.sh/?q=%25.{domain}&output=json"
_CRTSH_TIMEOUT = 30
_MAX_DOMAINS = 5
# -- blocking helpers (run in thread pool) ---------------------------------
def fetch_crtsh(domain: str) -> list[dict]:
"""GET crt.sh JSON for a domain. Blocking."""
url = _CRTSH_URL.format(domain=domain)
req = urllib.request.Request(url, headers={"User-Agent": "derp-irc-bot"})
with urllib.request.urlopen(req, timeout=_CRTSH_TIMEOUT) as resp:
return json.loads(resp.read())
def check_live_cert(domain: str) -> dict | None:
"""Connect to domain:443, return cert dict or None on failure."""
# Try verified first, fall back to unverified (expired cert is the point)
for ctx_factory in (_make_verified_ctx, _make_unverified_ctx):
ctx = ctx_factory()
try:
with socket.create_connection((domain, 443), timeout=10) as sock:
with ctx.wrap_socket(sock, server_hostname=domain) as ssock:
return ssock.getpeercert()
except (OSError, ssl.SSLError):
continue
return None
def _make_verified_ctx() -> ssl.SSLContext:
return ssl.create_default_context()
def _make_unverified_ctx() -> ssl.SSLContext:
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
return ctx
# -- pure helpers -----------------------------------------------------------
def deduplicate(certs: list[dict]) -> list[dict]:
"""Deduplicate certificates by serial_number."""
seen: dict[str, dict] = {}
for cert in certs:
serial = cert.get("serial_number", "")
if serial and serial not in seen:
seen[serial] = cert
return list(seen.values())
def parse_crtsh_ts(ts: str) -> datetime:
"""Parse crt.sh timestamp to timezone-aware datetime.
Handles 'YYYY-MM-DDTHH:MM:SS' and 'YYYY-MM-DDTHH:MM:SS.fff'
formats, both without trailing Z.
"""
ts = ts.strip()
for fmt in ("%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%S.%f"):
try:
return datetime.strptime(ts, fmt).replace(tzinfo=timezone.utc)
except ValueError:
continue
raise ValueError(f"unrecognised timestamp: {ts}")
def is_expired(cert: dict) -> bool:
"""Check if a crt.sh certificate entry is expired."""
not_after = cert.get("not_after", "")
if not not_after:
return False
try:
return parse_crtsh_ts(not_after) < datetime.now(timezone.utc)
except ValueError:
return False
def is_live_cert_expired(cert_dict: dict) -> bool:
"""Check if a live SSL cert (from getpeercert()) is expired."""
not_after = cert_dict.get("notAfter", "")
if not not_after:
return False
try:
# Format: 'Mon DD HH:MM:SS YYYY GMT'
expiry = datetime.strptime(not_after, "%b %d %H:%M:%S %Y %Z")
expiry = expiry.replace(tzinfo=timezone.utc)
return expiry < datetime.now(timezone.utc)
except ValueError:
return False
def format_result(domain: str, total: int, expired_count: int,
valid_count: int, live_expired: bool | None) -> str:
"""Format a single domain result as one IRC line."""
line = f"{domain} -- {total} certs ({expired_count} expired, {valid_count} valid)"
if live_expired is True:
line += " | live cert EXPIRED"
elif live_expired is False and expired_count > 0:
line += " | live cert ok"
return line
# -- async orchestration ----------------------------------------------------
async def analyze_domain(domain: str) -> str:
"""Full pipeline for one domain: fetch, dedup, analyze, live check."""
loop = asyncio.get_running_loop()
# Fetch crt.sh data
try:
raw = await asyncio.wait_for(
loop.run_in_executor(_pool, fetch_crtsh, domain),
timeout=35.0,
)
except TimeoutError:
return f"{domain} -- error: timeout"
except Exception as exc:
reason = str(exc)[:80] if str(exc) else type(exc).__name__
return f"{domain} -- error: {reason}"
if not raw:
return f"{domain} -- 0 certs"
# Dedup and classify
unique = deduplicate(raw)
total = len(unique)
expired_certs = [c for c in unique if is_expired(c)]
expired_count = len(expired_certs)
valid_count = total - expired_count
# Live cert check (only if there are expired certs to flag)
live_expired: bool | None = None
if expired_count > 0:
try:
cert_dict = await asyncio.wait_for(
loop.run_in_executor(_pool, check_live_cert, domain),
timeout=15.0,
)
if cert_dict is not None:
live_expired = is_live_cert_expired(cert_dict)
except (TimeoutError, Exception):
pass # Skip live check silently
return format_result(domain, total, expired_count, valid_count, live_expired)
@command("cert", help="Lookup CT logs for domain(s): !cert <domain> [domain2 ...]")
async def cmd_cert(bot, message):
"""Query crt.sh for certificate transparency data."""
parts = message.text.split()
domains = parts[1:_MAX_DOMAINS + 1]
if not domains:
await bot.reply(message, f"Usage: !cert <domain> [domain2 ...] (max {_MAX_DOMAINS})")
return
await bot.reply(message, f"Querying crt.sh for {len(domains)} domain(s)...")
results = await asyncio.gather(*[analyze_domain(d) for d in domains])
for line in results:
await bot.reply(message, line)

110
tests/test_crtsh.py Normal file
View File

@@ -0,0 +1,110 @@
"""Tests for the crt.sh certificate transparency plugin."""
from datetime import datetime, timezone
from plugins.crtsh import (
deduplicate,
format_result,
is_expired,
is_live_cert_expired,
parse_crtsh_ts,
)
class TestDeduplicate:
def test_removes_duplicate_serials(self):
certs = [
{"serial_number": "AAA", "common_name": "a.example.com"},
{"serial_number": "BBB", "common_name": "b.example.com"},
{"serial_number": "AAA", "common_name": "a.example.com (dup)"},
]
result = deduplicate(certs)
assert len(result) == 2
serials = {c["serial_number"] for c in result}
assert serials == {"AAA", "BBB"}
def test_keeps_first_occurrence(self):
certs = [
{"serial_number": "AAA", "common_name": "first"},
{"serial_number": "AAA", "common_name": "second"},
]
result = deduplicate(certs)
assert result[0]["common_name"] == "first"
def test_empty_input(self):
assert deduplicate([]) == []
def test_no_serial_field(self):
certs = [{"common_name": "no-serial"}, {"serial_number": "", "common_name": "empty"}]
result = deduplicate(certs)
assert len(result) == 0
def test_all_unique(self):
certs = [
{"serial_number": "A", "common_name": "a"},
{"serial_number": "B", "common_name": "b"},
{"serial_number": "C", "common_name": "c"},
]
assert len(deduplicate(certs)) == 3
class TestExpiredCheck:
def test_expired_cert(self):
cert = {"not_after": "2020-01-01T00:00:00"}
assert is_expired(cert) is True
def test_valid_cert(self):
cert = {"not_after": "2099-12-31T23:59:59"}
assert is_expired(cert) is False
def test_missing_not_after(self):
assert is_expired({}) is False
assert is_expired({"not_after": ""}) is False
def test_fractional_seconds(self):
cert = {"not_after": "2020-06-15T12:30:45.123"}
assert is_expired(cert) is True
def test_parse_timestamp_basic(self):
dt = parse_crtsh_ts("2024-03-15T10:30:00")
assert dt == datetime(2024, 3, 15, 10, 30, 0, tzinfo=timezone.utc)
def test_parse_timestamp_fractional(self):
dt = parse_crtsh_ts("2024-03-15T10:30:00.500")
assert dt.microsecond == 500000
class TestLiveCertExpired:
def test_expired_live_cert(self):
cert = {"notAfter": "Jan 1 00:00:00 2020 GMT"}
assert is_live_cert_expired(cert) is True
def test_valid_live_cert(self):
cert = {"notAfter": "Dec 31 23:59:59 2099 GMT"}
assert is_live_cert_expired(cert) is False
def test_missing_field(self):
assert is_live_cert_expired({}) is False
class TestFormatResult:
def test_basic(self):
line = format_result("example.com", 100, 10, 90, None)
assert line == "example.com -- 100 certs (10 expired, 90 valid)"
def test_live_expired(self):
line = format_result("bad.com", 50, 5, 45, True)
assert "live cert EXPIRED" in line
def test_live_ok_with_expired_certs(self):
line = format_result("ok.com", 50, 5, 45, False)
assert "live cert ok" in line
def test_live_ok_no_expired(self):
"""No live check annotation when zero expired certs."""
line = format_result("clean.com", 50, 0, 50, False)
assert "live cert" not in line
def test_zero_certs(self):
line = format_result("empty.com", 0, 0, 0, None)
assert line == "empty.com -- 0 certs (0 expired, 0 valid)"