"""Plugin: TLS certificate and cipher inspector (SOCKS5-proxied).""" from __future__ import annotations import asyncio import hashlib import ssl from datetime import datetime, timezone from derp.http import create_connection as _create_connection from derp.plugin import command _TIMEOUT = 10 def _inspect(host: str, port: int) -> dict: """Blocking TLS connection to extract cert and cipher info.""" result: dict = { "error": "", "version": "", "cipher": "", "bits": 0, "subject": "", "issuer": "", "san": [], "not_before": "", "not_after": "", "days_left": 0, "serial": "", "fingerprint": "", } ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE try: with _create_connection((host, port), timeout=_TIMEOUT) as sock: with ctx.wrap_socket(sock, server_hostname=host) as ssock: result["version"] = ssock.version() or "" cipher = ssock.cipher() if cipher: result["cipher"] = cipher[0] result["bits"] = cipher[2] cert_bin = ssock.getpeercert(binary_form=True) cert = ssock.getpeercert() if cert_bin: result["fingerprint"] = hashlib.sha256(cert_bin).hexdigest()[:16] if cert: # Subject CN for rdn in cert.get("subject", ()): for attr, val in rdn: if attr == "commonName": result["subject"] = val # Issuer CN / O for rdn in cert.get("issuer", ()): for attr, val in rdn: if attr in ("commonName", "organizationName"): if not result["issuer"]: result["issuer"] = val # SAN san = cert.get("subjectAltName", ()) result["san"] = [v for t, v in san if t == "DNS"] # Validity dates nb = cert.get("notBefore", "") na = cert.get("notAfter", "") result["not_before"] = nb result["not_after"] = na if na: try: exp = datetime.strptime(na, "%b %d %H:%M:%S %Y %Z") exp = exp.replace(tzinfo=timezone.utc) delta = exp - datetime.now(timezone.utc) result["days_left"] = delta.days except ValueError: pass result["serial"] = cert.get("serialNumber", "") except (OSError, ssl.SSLError) as exc: result["error"] = str(exc) return result @command("tlscheck", help="TLS inspect: !tlscheck [port]") async def cmd_tlscheck(bot, message): """Inspect TLS version, cipher, and certificate details. Usage: !tlscheck example.com !tlscheck 10.0.0.1 8443 """ parts = message.text.split(None, 3) if len(parts) < 2: await bot.reply(message, "Usage: !tlscheck [port]") return host = parts[1] port = 443 if len(parts) > 2: try: port = int(parts[2]) if not 1 <= port <= 65535: raise ValueError except ValueError: await bot.reply(message, f"Invalid port: {parts[2]}") return loop = asyncio.get_running_loop() result = await loop.run_in_executor(None, _inspect, host, port) if result["error"]: await bot.reply(message, f"{host}:{port} -> TLS error: {result['error']}") return info = [f"{result['version']} {result['cipher']} ({result['bits']}bit)"] if result["subject"]: info.append(f"CN={result['subject']}") if result["issuer"]: info.append(f"issuer={result['issuer']}") if result["days_left"]: label = "expires" if result["days_left"] > 0 else "EXPIRED" info.append(f"{label} in {abs(result['days_left'])}d") if result["san"]: san_str = ", ".join(result["san"][:5]) if len(result["san"]) > 5: san_str += f" (+{len(result['san']) - 5})" info.append(f"SAN: {san_str}") await bot.reply(message, f"{host}:{port} -> {' | '.join(info)}")