Files
derp/plugins/tlscheck.py
user 1bdba0ea06 feat: route raw TCP traffic through SOCKS5 proxy
Add create_connection and open_connection helpers to the shared proxy
module, covering portcheck, whois, tlscheck, and crtsh live-cert check.
UDP-based plugins (dns, blacklist, subdomain) stay direct.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 16:01:17 +01:00

140 lines
4.4 KiB
Python

"""Plugin: TLS certificate and cipher inspector (SOCKS5-proxied)."""
from __future__ import annotations
import asyncio
import hashlib
import ssl
from datetime import datetime, timezone
from derp.http import create_connection as _create_connection
from derp.plugin import command
_TIMEOUT = 10
def _inspect(host: str, port: int) -> dict:
"""Blocking TLS connection to extract cert and cipher info."""
result: dict = {
"error": "",
"version": "",
"cipher": "",
"bits": 0,
"subject": "",
"issuer": "",
"san": [],
"not_before": "",
"not_after": "",
"days_left": 0,
"serial": "",
"fingerprint": "",
}
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
try:
with _create_connection((host, port), timeout=_TIMEOUT) as sock:
with ctx.wrap_socket(sock, server_hostname=host) as ssock:
result["version"] = ssock.version() or ""
cipher = ssock.cipher()
if cipher:
result["cipher"] = cipher[0]
result["bits"] = cipher[2]
cert_bin = ssock.getpeercert(binary_form=True)
cert = ssock.getpeercert()
if cert_bin:
result["fingerprint"] = hashlib.sha256(cert_bin).hexdigest()[:16]
if cert:
# Subject CN
for rdn in cert.get("subject", ()):
for attr, val in rdn:
if attr == "commonName":
result["subject"] = val
# Issuer CN / O
for rdn in cert.get("issuer", ()):
for attr, val in rdn:
if attr in ("commonName", "organizationName"):
if not result["issuer"]:
result["issuer"] = val
# SAN
san = cert.get("subjectAltName", ())
result["san"] = [v for t, v in san if t == "DNS"]
# Validity dates
nb = cert.get("notBefore", "")
na = cert.get("notAfter", "")
result["not_before"] = nb
result["not_after"] = na
if na:
try:
exp = datetime.strptime(na, "%b %d %H:%M:%S %Y %Z")
exp = exp.replace(tzinfo=timezone.utc)
delta = exp - datetime.now(timezone.utc)
result["days_left"] = delta.days
except ValueError:
pass
result["serial"] = cert.get("serialNumber", "")
except (OSError, ssl.SSLError) as exc:
result["error"] = str(exc)
return result
@command("tlscheck", help="TLS inspect: !tlscheck <host> [port]")
async def cmd_tlscheck(bot, message):
"""Inspect TLS version, cipher, and certificate details.
Usage:
!tlscheck example.com
!tlscheck 10.0.0.1 8443
"""
parts = message.text.split(None, 3)
if len(parts) < 2:
await bot.reply(message, "Usage: !tlscheck <host> [port]")
return
host = parts[1]
port = 443
if len(parts) > 2:
try:
port = int(parts[2])
if not 1 <= port <= 65535:
raise ValueError
except ValueError:
await bot.reply(message, f"Invalid port: {parts[2]}")
return
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, _inspect, host, port)
if result["error"]:
await bot.reply(message, f"{host}:{port} -> TLS error: {result['error']}")
return
info = [f"{result['version']} {result['cipher']} ({result['bits']}bit)"]
if result["subject"]:
info.append(f"CN={result['subject']}")
if result["issuer"]:
info.append(f"issuer={result['issuer']}")
if result["days_left"]:
label = "expires" if result["days_left"] > 0 else "EXPIRED"
info.append(f"{label} in {abs(result['days_left'])}d")
if result["san"]:
san_str = ", ".join(result["san"][:5])
if len(result["san"]) > 5:
san_str += f" (+{len(result['san']) - 5})"
info.append(f"SAN: {san_str}")
await bot.reply(message, f"{host}:{port} -> {' | '.join(info)}")