Files
derp/plugins/portcheck.py
user 1bdba0ea06 feat: route raw TCP traffic through SOCKS5 proxy
Add create_connection and open_connection helpers to the shared proxy
module, covering portcheck, whois, tlscheck, and crtsh live-cert check.
UDP-based plugins (dns, blacklist, subdomain) stay direct.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 16:01:17 +01:00

130 lines
4.1 KiB
Python

"""Plugin: async TCP port scanner (SOCKS5-proxied)."""
from __future__ import annotations
import asyncio
import ipaddress
import time
from derp.http import open_connection as _open_connection
from derp.plugin import command
_TIMEOUT = 3.0
_MAX_PORTS = 20
_COMMON_PORTS = [
21, 22, 23, 25, 53, 80, 110, 111, 135, 139,
143, 443, 445, 993, 995, 1433, 1723, 3306,
3389, 5432, 5900, 6379, 8080, 8443, 27017,
]
_PORT_NAMES = {
21: "ftp", 22: "ssh", 23: "telnet", 25: "smtp", 53: "dns",
80: "http", 110: "pop3", 111: "rpc", 135: "msrpc", 139: "netbios",
143: "imap", 443: "https", 445: "smb", 993: "imaps", 995: "pop3s",
1433: "mssql", 1723: "pptp", 3306: "mysql", 3389: "rdp",
5432: "postgres", 5900: "vnc", 6379: "redis", 8080: "http-alt",
8443: "https-alt", 27017: "mongo",
}
def _is_internal(host: str) -> bool:
"""Check if a host resolves to a private/reserved address."""
try:
ip = ipaddress.ip_address(host)
return ip.is_private or ip.is_loopback or ip.is_reserved or ip.is_link_local
except ValueError:
return False
def _validate_host(host: str) -> bool:
"""Check that host is an IP or looks like a domain."""
try:
ipaddress.ip_address(host)
return True
except ValueError:
pass
return "." in host and all(c.isalnum() or c in ".-" for c in host)
async def _check_port(host: str, port: int, timeout: float) -> tuple[int, bool, float]:
"""Try TCP connect. Returns (port, open, rtt_ms)."""
t0 = time.monotonic()
try:
_, writer = await asyncio.wait_for(
_open_connection(host, port, timeout=timeout), timeout=timeout,
)
rtt = (time.monotonic() - t0) * 1000
writer.close()
await writer.wait_closed()
return port, True, rtt
except (OSError, asyncio.TimeoutError, TimeoutError):
return port, False, 0.0
def _parse_ports(spec: str) -> list[int]:
"""Parse a port spec like '22,80,443' or '1-1024' or 'common'."""
if spec.lower() == "common":
return list(_COMMON_PORTS)
ports: list[int] = []
for part in spec.split(","):
part = part.strip()
if "-" in part:
lo, _, hi = part.partition("-")
lo_i, hi_i = int(lo), int(hi)
if lo_i < 1 or hi_i > 65535 or lo_i > hi_i:
continue
ports.extend(range(lo_i, min(hi_i + 1, lo_i + _MAX_PORTS + 1)))
else:
p = int(part)
if 1 <= p <= 65535:
ports.append(p)
return ports[:_MAX_PORTS]
@command("portcheck", help="TCP port scan: !portcheck <host> [ports|common]")
async def cmd_portcheck(bot, message):
"""Async TCP connect scan against a host.
Usage:
!portcheck 10.0.0.1 (top 25 common ports)
!portcheck 10.0.0.1 22,80,443 (specific ports)
!portcheck 10.0.0.1 1-100 (range, max 20)
"""
parts = message.text.split(None, 3)
if len(parts) < 2:
await bot.reply(message, "Usage: !portcheck <host> [ports|common]")
return
host = parts[1]
if not _validate_host(host):
await bot.reply(message, f"Invalid host: {host}")
return
if _is_internal(host):
await bot.reply(message, f"Refused: {host} is an internal/reserved address")
return
port_spec = parts[2] if len(parts) > 2 else "common"
try:
ports = _parse_ports(port_spec)
except ValueError:
await bot.reply(message, f"Invalid port spec: {port_spec}")
return
if not ports:
await bot.reply(message, "No valid ports to scan")
return
tasks = [_check_port(host, p, _TIMEOUT) for p in ports]
results = await asyncio.gather(*tasks)
open_ports = [(p, rtt) for p, is_open, rtt in results if is_open]
if open_ports:
lines = []
for port, rtt in sorted(open_ports):
name = _PORT_NAMES.get(port, "")
label = f"{port}/{name}" if name else str(port)
lines.append(f"{label} ({rtt:.0f}ms)")
await bot.reply(message, f"{host} open: {', '.join(lines)}")
else:
await bot.reply(message, f"{host}: no open ports found ({len(ports)} scanned)")