"""Plugin: VirusTotal API v3 lookup for hashes, IPs, domains, and URLs.""" from __future__ import annotations import asyncio import base64 import json import logging import os import re import time import urllib.request from derp.http import urlopen as _urlopen from derp.plugin import command log = logging.getLogger(__name__) _API_BASE = "https://www.virustotal.com/api/v3" _RATE_WINDOW = 60 _RATE_LIMIT = 4 # Module-level rate tracking _request_times: list[float] = [] # Detection regexes _RE_MD5 = re.compile(r"^[0-9a-fA-F]{32}$") _RE_SHA1 = re.compile(r"^[0-9a-fA-F]{40}$") _RE_SHA256 = re.compile(r"^[0-9a-fA-F]{64}$") _RE_URL = re.compile(r"^https?://", re.IGNORECASE) _RE_IP = re.compile(r"^(\d{1,3}\.){3}\d{1,3}$") _RE_DOMAIN = re.compile(r"^[a-zA-Z0-9]([a-zA-Z0-9\-]*[a-zA-Z0-9])?(\.[a-zA-Z]{2,})+$") def _get_api_key(bot) -> str: """Resolve API key from environment or config.""" return (os.environ.get("VIRUSTOTAL_API_KEY", "") or bot.config.get("virustotal", {}).get("api_key", "")) def _rate_check() -> bool: """Check rate limit. Returns True if request is allowed.""" now = time.monotonic() # Prune old timestamps _request_times[:] = [t for t in _request_times if (now - t) < _RATE_WINDOW] if len(_request_times) >= _RATE_LIMIT: return False _request_times.append(now) return True def _detect_type(query: str) -> tuple[str, str]: """Detect input type. Returns (type, endpoint_path) or raises ValueError.""" if _RE_MD5.match(query) or _RE_SHA1.match(query) or _RE_SHA256.match(query): return "file", f"/files/{query.lower()}" if _RE_URL.match(query): url_id = base64.urlsafe_b64encode(query.encode()).decode().rstrip("=") return "url", f"/urls/{url_id}" if _RE_IP.match(query): return "ip", f"/ip_addresses/{query}" if _RE_DOMAIN.match(query): return "domain", f"/domains/{query.lower()}" raise ValueError(f"cannot determine type for: {query}") def _vt_request(api_key: str, path: str) -> dict: """Make VT API request (blocking).""" req = urllib.request.Request(f"{_API_BASE}{path}", headers={ "x-apikey": api_key, "Accept": "application/json", "User-Agent": "derp-bot", }) with _urlopen(req, timeout=20) as resp: return json.loads(resp.read()) def _format_file(query: str, data: dict) -> str: """Format file/hash result.""" attrs = data.get("data", {}).get("attributes", {}) stats = attrs.get("last_analysis_stats", {}) malicious = stats.get("malicious", 0) total = sum(stats.values()) tags = attrs.get("popular_threat_classification", {}) labels = [] for entry in tags.get("suggested_threat_label", [])[:3]: labels.append(entry.get("value", "")) for entry in tags.get("popular_threat_category", [])[:2]: val = entry.get("value", "") if val and val not in labels: labels.append(val) first_seen = attrs.get("first_submission_date", "") if isinstance(first_seen, int): from datetime import datetime, timezone first_seen = datetime.fromtimestamp(first_seen, tz=timezone.utc).strftime("%Y-%m-%d") short_hash = query[:16] + "..." if len(query) > 16 else query parts = [f"{short_hash} -- {malicious}/{total} detected"] if labels: parts.append(", ".join(labels)) if first_seen: parts.append(f"first seen: {first_seen}") return " | ".join(parts) def _format_ip(query: str, data: dict) -> str: """Format IP address result.""" attrs = data.get("data", {}).get("attributes", {}) stats = attrs.get("last_analysis_stats", {}) malicious = stats.get("malicious", 0) total = sum(stats.values()) asn = attrs.get("asn", "?") as_owner = attrs.get("as_owner", "?") country = attrs.get("country", "?") reputation = attrs.get("reputation", 0) return (f"{query} -- {malicious}/{total} | AS{asn} {as_owner}" f" | Country: {country} | Reputation: {reputation}") def _format_domain(query: str, data: dict) -> str: """Format domain result.""" attrs = data.get("data", {}).get("attributes", {}) stats = attrs.get("last_analysis_stats", {}) malicious = stats.get("malicious", 0) total = sum(stats.values()) registrar = attrs.get("registrar", "?") reputation = attrs.get("reputation", 0) categories = attrs.get("categories", {}) cats = list(set(categories.values()))[:3] parts = [f"{query} -- {malicious}/{total}"] if cats: parts.append(", ".join(cats)) parts.append(f"Registrar: {registrar}") parts.append(f"Reputation: {reputation}") return " | ".join(parts) def _format_url(query: str, data: dict) -> str: """Format URL result.""" attrs = data.get("data", {}).get("attributes", {}) stats = attrs.get("last_analysis_stats", {}) malicious = stats.get("malicious", 0) total = sum(stats.values()) title = attrs.get("title", "") final_url = attrs.get("last_final_url", query) parts = [f"{final_url} -- {malicious}/{total}"] if title: parts.append(title[:60]) return " | ".join(parts) _FORMATTERS = { "file": _format_file, "ip": _format_ip, "domain": _format_domain, "url": _format_url, } @command("vt", help="VirusTotal: !vt ") async def cmd_vt(bot, message): """Query VirusTotal API for file hashes, IPs, domains, or URLs. Usage: !vt 44d88612fea8a8f36de82e12... File hash (MD5/SHA1/SHA256) !vt 8.8.8.8 IP address !vt example.com Domain !vt https://example.com/page URL """ parts = message.text.split(None, 2) if len(parts) < 2: await bot.reply(message, "Usage: !vt ") return api_key = _get_api_key(bot) if not api_key: await bot.reply(message, "VirusTotal API key not configured") return query = parts[1].strip() try: qtype, path = _detect_type(query) except ValueError as exc: await bot.reply(message, str(exc)) return if not _rate_check(): await bot.reply(message, "Rate limited (4 requests/min) -- try again shortly") return loop = asyncio.get_running_loop() try: result = await loop.run_in_executor(None, _vt_request, api_key, path) except urllib.request.HTTPError as exc: if exc.code == 404: await bot.reply(message, f"{query} -- not found in VirusTotal") elif exc.code == 429: await bot.reply(message, "VirusTotal API quota exceeded") else: await bot.reply(message, f"{query} -- API error: HTTP {exc.code}") return except Exception as exc: await bot.reply(message, f"{query} -- error: {exc}") return formatter = _FORMATTERS.get(qtype, _format_file) await bot.reply(message, formatter(query, result))