diff --git a/TASKS.md b/TASKS.md index f0e3cef..dbf330a 100644 --- a/TASKS.md +++ b/TASKS.md @@ -12,4 +12,5 @@ | P0 | [x] | Unit tests for parser and plugins | | P0 | [x] | Documentation | | P1 | [ ] | Test against live IRC server | +| P1 | [x] | crt.sh CT lookup plugin (`!cert`) | | P2 | [ ] | SASL authentication | diff --git a/docs/CHEATSHEET.md b/docs/CHEATSHEET.md index ff4b8e7..0d10374 100644 --- a/docs/CHEATSHEET.md +++ b/docs/CHEATSHEET.md @@ -20,6 +20,7 @@ derp -v # Verbose/debug mode !help # Command help !version # Bot version !echo # Echo text back +!cert # CT log lookup (max 5 domains) ``` ## Plugin Template diff --git a/docs/USAGE.md b/docs/USAGE.md index 56ec035..1a5610d 100644 --- a/docs/USAGE.md +++ b/docs/USAGE.md @@ -51,6 +51,30 @@ level = "info" # Logging level: debug, info, warning, error | `!help ` | Show help for a specific command | | `!version` | Show bot version | | `!echo ` | Echo back text (example plugin) | +| `!cert [...]` | Lookup CT logs for up to 5 domains | + +### `!cert` -- Certificate Transparency Lookup + +Query [crt.sh](https://crt.sh) CT logs to enumerate SSL certificates for +domains. Reports totals (expired/valid) and flags domains still serving +expired certs. + +``` +!cert example.com +!cert example.com badsite.com another.org +``` + +Output format: + +``` +example.com -- 127 certs (23 expired, 104 valid) +badsite.com -- 45 certs (8 expired, 37 valid) | live cert EXPIRED +broken.test -- error: timeout +``` + +- Max 5 domains per invocation +- crt.sh can be slow; the bot confirms receipt before querying +- Live cert check runs only when expired CT entries exist ## Writing Plugins diff --git a/plugins/crtsh.py b/plugins/crtsh.py new file mode 100644 index 0000000..aae5b55 --- /dev/null +++ b/plugins/crtsh.py @@ -0,0 +1,186 @@ +"""Plugin: crt.sh certificate transparency lookup. + +Query CT logs to enumerate SSL certificates for domains, report +totals (expired/valid), and flag domains still serving expired certs. +""" + +import asyncio +import json +import logging +import socket +import ssl +import urllib.request +from concurrent.futures import ThreadPoolExecutor +from datetime import datetime, timezone + +from derp.plugin import command + +log = logging.getLogger(__name__) + +# Shared pool -- 3 workers keeps RPi5 happy while allowing meaningful +# parallelism (each crt.sh request blocks for seconds). +_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="crtsh") + +_CRTSH_URL = "https://crt.sh/?q=%25.{domain}&output=json" +_CRTSH_TIMEOUT = 30 +_MAX_DOMAINS = 5 + + +# -- blocking helpers (run in thread pool) --------------------------------- + +def fetch_crtsh(domain: str) -> list[dict]: + """GET crt.sh JSON for a domain. Blocking.""" + url = _CRTSH_URL.format(domain=domain) + req = urllib.request.Request(url, headers={"User-Agent": "derp-irc-bot"}) + with urllib.request.urlopen(req, timeout=_CRTSH_TIMEOUT) as resp: + return json.loads(resp.read()) + + +def check_live_cert(domain: str) -> dict | None: + """Connect to domain:443, return cert dict or None on failure.""" + # Try verified first, fall back to unverified (expired cert is the point) + for ctx_factory in (_make_verified_ctx, _make_unverified_ctx): + ctx = ctx_factory() + try: + with socket.create_connection((domain, 443), timeout=10) as sock: + with ctx.wrap_socket(sock, server_hostname=domain) as ssock: + return ssock.getpeercert() + except (OSError, ssl.SSLError): + continue + return None + + +def _make_verified_ctx() -> ssl.SSLContext: + return ssl.create_default_context() + + +def _make_unverified_ctx() -> ssl.SSLContext: + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + return ctx + + +# -- pure helpers ----------------------------------------------------------- + +def deduplicate(certs: list[dict]) -> list[dict]: + """Deduplicate certificates by serial_number.""" + seen: dict[str, dict] = {} + for cert in certs: + serial = cert.get("serial_number", "") + if serial and serial not in seen: + seen[serial] = cert + return list(seen.values()) + + +def parse_crtsh_ts(ts: str) -> datetime: + """Parse crt.sh timestamp to timezone-aware datetime. + + Handles 'YYYY-MM-DDTHH:MM:SS' and 'YYYY-MM-DDTHH:MM:SS.fff' + formats, both without trailing Z. + """ + ts = ts.strip() + for fmt in ("%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%S.%f"): + try: + return datetime.strptime(ts, fmt).replace(tzinfo=timezone.utc) + except ValueError: + continue + raise ValueError(f"unrecognised timestamp: {ts}") + + +def is_expired(cert: dict) -> bool: + """Check if a crt.sh certificate entry is expired.""" + not_after = cert.get("not_after", "") + if not not_after: + return False + try: + return parse_crtsh_ts(not_after) < datetime.now(timezone.utc) + except ValueError: + return False + + +def is_live_cert_expired(cert_dict: dict) -> bool: + """Check if a live SSL cert (from getpeercert()) is expired.""" + not_after = cert_dict.get("notAfter", "") + if not not_after: + return False + try: + # Format: 'Mon DD HH:MM:SS YYYY GMT' + expiry = datetime.strptime(not_after, "%b %d %H:%M:%S %Y %Z") + expiry = expiry.replace(tzinfo=timezone.utc) + return expiry < datetime.now(timezone.utc) + except ValueError: + return False + + +def format_result(domain: str, total: int, expired_count: int, + valid_count: int, live_expired: bool | None) -> str: + """Format a single domain result as one IRC line.""" + line = f"{domain} -- {total} certs ({expired_count} expired, {valid_count} valid)" + if live_expired is True: + line += " | live cert EXPIRED" + elif live_expired is False and expired_count > 0: + line += " | live cert ok" + return line + + +# -- async orchestration ---------------------------------------------------- + +async def analyze_domain(domain: str) -> str: + """Full pipeline for one domain: fetch, dedup, analyze, live check.""" + loop = asyncio.get_running_loop() + + # Fetch crt.sh data + try: + raw = await asyncio.wait_for( + loop.run_in_executor(_pool, fetch_crtsh, domain), + timeout=35.0, + ) + except TimeoutError: + return f"{domain} -- error: timeout" + except Exception as exc: + reason = str(exc)[:80] if str(exc) else type(exc).__name__ + return f"{domain} -- error: {reason}" + + if not raw: + return f"{domain} -- 0 certs" + + # Dedup and classify + unique = deduplicate(raw) + total = len(unique) + expired_certs = [c for c in unique if is_expired(c)] + expired_count = len(expired_certs) + valid_count = total - expired_count + + # Live cert check (only if there are expired certs to flag) + live_expired: bool | None = None + if expired_count > 0: + try: + cert_dict = await asyncio.wait_for( + loop.run_in_executor(_pool, check_live_cert, domain), + timeout=15.0, + ) + if cert_dict is not None: + live_expired = is_live_cert_expired(cert_dict) + except (TimeoutError, Exception): + pass # Skip live check silently + + return format_result(domain, total, expired_count, valid_count, live_expired) + + +@command("cert", help="Lookup CT logs for domain(s): !cert [domain2 ...]") +async def cmd_cert(bot, message): + """Query crt.sh for certificate transparency data.""" + parts = message.text.split() + domains = parts[1:_MAX_DOMAINS + 1] + + if not domains: + await bot.reply(message, f"Usage: !cert [domain2 ...] (max {_MAX_DOMAINS})") + return + + await bot.reply(message, f"Querying crt.sh for {len(domains)} domain(s)...") + + results = await asyncio.gather(*[analyze_domain(d) for d in domains]) + + for line in results: + await bot.reply(message, line) diff --git a/tests/test_crtsh.py b/tests/test_crtsh.py new file mode 100644 index 0000000..08cf8fe --- /dev/null +++ b/tests/test_crtsh.py @@ -0,0 +1,110 @@ +"""Tests for the crt.sh certificate transparency plugin.""" + +from datetime import datetime, timezone + +from plugins.crtsh import ( + deduplicate, + format_result, + is_expired, + is_live_cert_expired, + parse_crtsh_ts, +) + + +class TestDeduplicate: + def test_removes_duplicate_serials(self): + certs = [ + {"serial_number": "AAA", "common_name": "a.example.com"}, + {"serial_number": "BBB", "common_name": "b.example.com"}, + {"serial_number": "AAA", "common_name": "a.example.com (dup)"}, + ] + result = deduplicate(certs) + assert len(result) == 2 + serials = {c["serial_number"] for c in result} + assert serials == {"AAA", "BBB"} + + def test_keeps_first_occurrence(self): + certs = [ + {"serial_number": "AAA", "common_name": "first"}, + {"serial_number": "AAA", "common_name": "second"}, + ] + result = deduplicate(certs) + assert result[0]["common_name"] == "first" + + def test_empty_input(self): + assert deduplicate([]) == [] + + def test_no_serial_field(self): + certs = [{"common_name": "no-serial"}, {"serial_number": "", "common_name": "empty"}] + result = deduplicate(certs) + assert len(result) == 0 + + def test_all_unique(self): + certs = [ + {"serial_number": "A", "common_name": "a"}, + {"serial_number": "B", "common_name": "b"}, + {"serial_number": "C", "common_name": "c"}, + ] + assert len(deduplicate(certs)) == 3 + + +class TestExpiredCheck: + def test_expired_cert(self): + cert = {"not_after": "2020-01-01T00:00:00"} + assert is_expired(cert) is True + + def test_valid_cert(self): + cert = {"not_after": "2099-12-31T23:59:59"} + assert is_expired(cert) is False + + def test_missing_not_after(self): + assert is_expired({}) is False + assert is_expired({"not_after": ""}) is False + + def test_fractional_seconds(self): + cert = {"not_after": "2020-06-15T12:30:45.123"} + assert is_expired(cert) is True + + def test_parse_timestamp_basic(self): + dt = parse_crtsh_ts("2024-03-15T10:30:00") + assert dt == datetime(2024, 3, 15, 10, 30, 0, tzinfo=timezone.utc) + + def test_parse_timestamp_fractional(self): + dt = parse_crtsh_ts("2024-03-15T10:30:00.500") + assert dt.microsecond == 500000 + + +class TestLiveCertExpired: + def test_expired_live_cert(self): + cert = {"notAfter": "Jan 1 00:00:00 2020 GMT"} + assert is_live_cert_expired(cert) is True + + def test_valid_live_cert(self): + cert = {"notAfter": "Dec 31 23:59:59 2099 GMT"} + assert is_live_cert_expired(cert) is False + + def test_missing_field(self): + assert is_live_cert_expired({}) is False + + +class TestFormatResult: + def test_basic(self): + line = format_result("example.com", 100, 10, 90, None) + assert line == "example.com -- 100 certs (10 expired, 90 valid)" + + def test_live_expired(self): + line = format_result("bad.com", 50, 5, 45, True) + assert "live cert EXPIRED" in line + + def test_live_ok_with_expired_certs(self): + line = format_result("ok.com", 50, 5, 45, False) + assert "live cert ok" in line + + def test_live_ok_no_expired(self): + """No live check annotation when zero expired certs.""" + line = format_result("clean.com", 50, 0, 50, False) + assert "live cert" not in line + + def test_zero_certs(self): + line = format_result("empty.com", 0, 0, 0, None) + assert line == "empty.com -- 0 certs (0 expired, 0 valid)"