"""Plugin: TCP connect latency probe (SOCKS5-proxied).""" from __future__ import annotations import asyncio import ipaddress import time from derp.http import open_connection as _open_connection from derp.plugin import command _TIMEOUT = 10.0 _MAX_COUNT = 10 _DEFAULT_COUNT = 3 _DEFAULT_PORT = 443 def _is_internal(host: str) -> bool: """Check if host is a private/reserved address.""" try: ip = ipaddress.ip_address(host) return ip.is_private or ip.is_loopback or ip.is_reserved or ip.is_link_local except ValueError: return False def _validate_host(host: str) -> bool: """Check that host is an IP or looks like a domain.""" try: ipaddress.ip_address(host) return True except ValueError: pass return "." in host and all(c.isalnum() or c in ".-" for c in host) async def _probe(host: str, port: int, timeout: float) -> float | None: """Single TCP connect probe. Returns RTT in ms or None on failure.""" t0 = time.perf_counter() try: _, writer = await asyncio.wait_for( _open_connection(host, port, timeout=timeout), timeout=timeout, ) rtt = (time.perf_counter() - t0) * 1000 writer.close() await writer.wait_closed() return rtt except (OSError, asyncio.TimeoutError, TimeoutError): return None @command("tcping", help="TCP latency: !tcping [port] [count]") async def cmd_tcping(bot, message): """Measure TCP connect latency to a host:port through SOCKS5 proxy. Usage: !tcping example.com (port 443, 3 probes) !tcping example.com 22 (port 22, 3 probes) !tcping example.com 80 5 (port 80, 5 probes) """ parts = message.text.split() if len(parts) < 2: await bot.reply(message, "Usage: !tcping [port] [count]") return host = parts[1] if not _validate_host(host): await bot.reply(message, f"Invalid host: {host}") return if _is_internal(host): await bot.reply(message, f"Refused: {host} is an internal/reserved address") return port = _DEFAULT_PORT count = _DEFAULT_COUNT if len(parts) > 2: try: port = int(parts[2]) if port < 1 or port > 65535: raise ValueError except ValueError: await bot.reply(message, f"Invalid port: {parts[2]}") return if len(parts) > 3: try: count = int(parts[3]) count = max(1, min(count, _MAX_COUNT)) except ValueError: pass results: list[float | None] = [] for _ in range(count): rtt = await _probe(host, port, _TIMEOUT) results.append(rtt) successes = [r for r in results if r is not None] if not successes: await bot.reply(message, f"tcping {host}:{port} -- {count} probes, all timed out") return probe_strs = [] for i, r in enumerate(results, 1): probe_strs.append(f"{i}: {r:.0f}ms" if r is not None else f"{i}: timeout") mn = min(successes) avg = sum(successes) / len(successes) mx = max(successes) header = f"tcping {host}:{port} -- {count} probes" probes = " ".join(probe_strs) summary = f"min/avg/max: {mn:.0f}/{avg:.0f}/{mx:.0f} ms" await bot.reply(message, f"{header} {probes} {summary}")