"""Plugin: Shodan InternetDB -- free host reconnaissance (no API key).""" from __future__ import annotations import asyncio import ipaddress import json import logging from derp.http import urlopen as _urlopen from derp.plugin import command log = logging.getLogger(__name__) _API_URL = "https://internetdb.shodan.io" _TIMEOUT = 15 def _fetch(addr: str) -> dict | None: """Fetch InternetDB data for an IP address. Returns parsed JSON dict, or None on 404 (no data). Raises on network/server errors. """ import urllib.error try: resp = _urlopen(f"{_API_URL}/{addr}", timeout=_TIMEOUT) return json.loads(resp.read()) except urllib.error.HTTPError as exc: if exc.code == 404: return None raise def _format_result(addr: str, data: dict) -> str: """Format InternetDB response into a compact IRC message.""" lines = [] hostnames = data.get("hostnames", []) if hostnames: lines.append(f"{addr} -- {', '.join(hostnames[:5])}") else: lines.append(addr) ports = data.get("ports", []) if ports: lines.append(f"Ports: {', '.join(str(p) for p in sorted(ports))}") cpes = data.get("cpes", []) if cpes: lines.append(f"CPEs: {', '.join(cpes[:8])}") tags = data.get("tags", []) if tags: lines.append(f"Tags: {', '.join(tags)}") vulns = data.get("vulns", []) if vulns: shown = vulns[:10] suffix = f" (+{len(vulns) - 10} more)" if len(vulns) > 10 else "" lines.append(f"CVEs: {', '.join(shown)}{suffix}") return " | ".join(lines) @command("internetdb", help="Shodan InternetDB: !internetdb ") async def cmd_internetdb(bot, message): """Look up host information from Shodan InternetDB. Returns open ports, hostnames, CPEs, tags, and known CVEs. Free API, no key required. Usage: !internetdb 8.8.8.8 """ parts = message.text.split(None, 2) if len(parts) < 2: await bot.reply(message, "Usage: !internetdb ") return addr = parts[1].strip() try: ip = ipaddress.ip_address(addr) except ValueError: await bot.reply(message, f"Invalid IP address: {addr}") return if ip.is_private or ip.is_loopback: await bot.reply(message, f"{addr}: private/loopback address") return loop = asyncio.get_running_loop() try: data = await loop.run_in_executor(None, _fetch, str(ip)) except Exception as exc: log.error("internetdb: lookup failed for %s: %s", addr, exc) await bot.reply(message, f"{addr}: lookup failed ({exc})") return if data is None: await bot.reply(message, f"{addr}: no data available") return await bot.reply(message, _format_result(str(ip), data))