Files
derp/plugins/tcping.py
user e3bb793574 feat: add canary, tcping, archive, resolve plugins
canary: generate realistic fake credentials (token/aws/basic) for
planting as canary tripwires. Per-channel state persistence.

tcping: TCP connect latency probe through SOCKS5 proxy with
min/avg/max reporting. Proxy-compatible alternative to traceroute.

archive: save URLs to Wayback Machine via Save Page Now API,
routed through SOCKS5 proxy.

resolve: bulk DNS resolution (up to 10 hosts) via TCP DNS through
SOCKS5 proxy with concurrent asyncio.gather.

83 new tests (1010 total), docs updated.
2026-02-20 19:38:10 +01:00

117 lines
3.3 KiB
Python

"""Plugin: TCP connect latency probe (SOCKS5-proxied)."""
from __future__ import annotations
import asyncio
import ipaddress
import time
from derp.http import open_connection as _open_connection
from derp.plugin import command
_TIMEOUT = 10.0
_MAX_COUNT = 10
_DEFAULT_COUNT = 3
_DEFAULT_PORT = 443
def _is_internal(host: str) -> bool:
"""Check if host is a private/reserved address."""
try:
ip = ipaddress.ip_address(host)
return ip.is_private or ip.is_loopback or ip.is_reserved or ip.is_link_local
except ValueError:
return False
def _validate_host(host: str) -> bool:
"""Check that host is an IP or looks like a domain."""
try:
ipaddress.ip_address(host)
return True
except ValueError:
pass
return "." in host and all(c.isalnum() or c in ".-" for c in host)
async def _probe(host: str, port: int, timeout: float) -> float | None:
"""Single TCP connect probe. Returns RTT in ms or None on failure."""
t0 = time.perf_counter()
try:
_, writer = await asyncio.wait_for(
_open_connection(host, port, timeout=timeout), timeout=timeout,
)
rtt = (time.perf_counter() - t0) * 1000
writer.close()
await writer.wait_closed()
return rtt
except (OSError, asyncio.TimeoutError, TimeoutError):
return None
@command("tcping", help="TCP latency: !tcping <host> [port] [count]")
async def cmd_tcping(bot, message):
"""Measure TCP connect latency to a host:port through SOCKS5 proxy.
Usage:
!tcping example.com (port 443, 3 probes)
!tcping example.com 22 (port 22, 3 probes)
!tcping example.com 80 5 (port 80, 5 probes)
"""
parts = message.text.split()
if len(parts) < 2:
await bot.reply(message, "Usage: !tcping <host> [port] [count]")
return
host = parts[1]
if not _validate_host(host):
await bot.reply(message, f"Invalid host: {host}")
return
if _is_internal(host):
await bot.reply(message, f"Refused: {host} is an internal/reserved address")
return
port = _DEFAULT_PORT
count = _DEFAULT_COUNT
if len(parts) > 2:
try:
port = int(parts[2])
if port < 1 or port > 65535:
raise ValueError
except ValueError:
await bot.reply(message, f"Invalid port: {parts[2]}")
return
if len(parts) > 3:
try:
count = int(parts[3])
count = max(1, min(count, _MAX_COUNT))
except ValueError:
pass
results: list[float | None] = []
for _ in range(count):
rtt = await _probe(host, port, _TIMEOUT)
results.append(rtt)
successes = [r for r in results if r is not None]
if not successes:
await bot.reply(message, f"tcping {host}:{port} -- {count} probes, all timed out")
return
probe_strs = []
for i, r in enumerate(results, 1):
probe_strs.append(f"{i}: {r:.0f}ms" if r is not None else f"{i}: timeout")
mn = min(successes)
avg = sum(successes) / len(successes)
mx = max(successes)
header = f"tcping {host}:{port} -- {count} probes"
probes = " ".join(probe_strs)
summary = f"min/avg/max: {mn:.0f}/{avg:.0f}/{mx:.0f} ms"
await bot.reply(message, f"{header} {probes} {summary}")