feat: add wave 2 plugins and --cprofile CLI flag
Add 7 new pure-stdlib plugins: whois (raw TCP port 43), portcheck (async TCP connect scan with internal-net guard), httpcheck (HTTP status/redirects/timing), tlscheck (TLS version/cipher/cert inspect), blacklist (parallel DNSBL check against 10 RBLs), rand (password/hex/ uuid/bytes/int/coin/dice), and timer (async countdown notifications). Add --cprofile flag to CLI for profiling bot runtime. Update all docs. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
139
plugins/tlscheck.py
Normal file
139
plugins/tlscheck.py
Normal file
@@ -0,0 +1,139 @@
|
||||
"""Plugin: TLS certificate and cipher inspector (pure stdlib)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import hashlib
|
||||
import socket
|
||||
import ssl
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from derp.plugin import command
|
||||
|
||||
_TIMEOUT = 10
|
||||
|
||||
|
||||
def _inspect(host: str, port: int) -> dict:
|
||||
"""Blocking TLS connection to extract cert and cipher info."""
|
||||
result: dict = {
|
||||
"error": "",
|
||||
"version": "",
|
||||
"cipher": "",
|
||||
"bits": 0,
|
||||
"subject": "",
|
||||
"issuer": "",
|
||||
"san": [],
|
||||
"not_before": "",
|
||||
"not_after": "",
|
||||
"days_left": 0,
|
||||
"serial": "",
|
||||
"fingerprint": "",
|
||||
}
|
||||
|
||||
ctx = ssl.create_default_context()
|
||||
ctx.check_hostname = False
|
||||
ctx.verify_mode = ssl.CERT_NONE
|
||||
|
||||
try:
|
||||
with socket.create_connection((host, port), timeout=_TIMEOUT) as sock:
|
||||
with ctx.wrap_socket(sock, server_hostname=host) as ssock:
|
||||
result["version"] = ssock.version() or ""
|
||||
cipher = ssock.cipher()
|
||||
if cipher:
|
||||
result["cipher"] = cipher[0]
|
||||
result["bits"] = cipher[2]
|
||||
|
||||
cert_bin = ssock.getpeercert(binary_form=True)
|
||||
cert = ssock.getpeercert()
|
||||
|
||||
if cert_bin:
|
||||
result["fingerprint"] = hashlib.sha256(cert_bin).hexdigest()[:16]
|
||||
|
||||
if cert:
|
||||
# Subject CN
|
||||
for rdn in cert.get("subject", ()):
|
||||
for attr, val in rdn:
|
||||
if attr == "commonName":
|
||||
result["subject"] = val
|
||||
|
||||
# Issuer CN / O
|
||||
for rdn in cert.get("issuer", ()):
|
||||
for attr, val in rdn:
|
||||
if attr in ("commonName", "organizationName"):
|
||||
if not result["issuer"]:
|
||||
result["issuer"] = val
|
||||
|
||||
# SAN
|
||||
san = cert.get("subjectAltName", ())
|
||||
result["san"] = [v for t, v in san if t == "DNS"]
|
||||
|
||||
# Validity dates
|
||||
nb = cert.get("notBefore", "")
|
||||
na = cert.get("notAfter", "")
|
||||
result["not_before"] = nb
|
||||
result["not_after"] = na
|
||||
|
||||
if na:
|
||||
try:
|
||||
exp = datetime.strptime(na, "%b %d %H:%M:%S %Y %Z")
|
||||
exp = exp.replace(tzinfo=timezone.utc)
|
||||
delta = exp - datetime.now(timezone.utc)
|
||||
result["days_left"] = delta.days
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
result["serial"] = cert.get("serialNumber", "")
|
||||
|
||||
except (OSError, ssl.SSLError) as exc:
|
||||
result["error"] = str(exc)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
@command("tlscheck", help="TLS inspect: !tlscheck <host> [port]")
|
||||
async def cmd_tlscheck(bot, message):
|
||||
"""Inspect TLS version, cipher, and certificate details.
|
||||
|
||||
Usage:
|
||||
!tlscheck example.com
|
||||
!tlscheck 10.0.0.1 8443
|
||||
"""
|
||||
parts = message.text.split(None, 3)
|
||||
if len(parts) < 2:
|
||||
await bot.reply(message, "Usage: !tlscheck <host> [port]")
|
||||
return
|
||||
|
||||
host = parts[1]
|
||||
port = 443
|
||||
if len(parts) > 2:
|
||||
try:
|
||||
port = int(parts[2])
|
||||
if not 1 <= port <= 65535:
|
||||
raise ValueError
|
||||
except ValueError:
|
||||
await bot.reply(message, f"Invalid port: {parts[2]}")
|
||||
return
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
result = await loop.run_in_executor(None, _inspect, host, port)
|
||||
|
||||
if result["error"]:
|
||||
await bot.reply(message, f"{host}:{port} -> TLS error: {result['error']}")
|
||||
return
|
||||
|
||||
info = [f"{result['version']} {result['cipher']} ({result['bits']}bit)"]
|
||||
|
||||
if result["subject"]:
|
||||
info.append(f"CN={result['subject']}")
|
||||
if result["issuer"]:
|
||||
info.append(f"issuer={result['issuer']}")
|
||||
if result["days_left"]:
|
||||
label = "expires" if result["days_left"] > 0 else "EXPIRED"
|
||||
info.append(f"{label} in {abs(result['days_left'])}d")
|
||||
if result["san"]:
|
||||
san_str = ", ".join(result["san"][:5])
|
||||
if len(result["san"]) > 5:
|
||||
san_str += f" (+{len(result['san']) - 5})"
|
||||
info.append(f"SAN: {san_str}")
|
||||
|
||||
await bot.reply(message, f"{host}:{port} -> {' | '.join(info)}")
|
||||
Reference in New Issue
Block a user