"""Plugin: AbuseIPDB IP reputation check and reporting.""" from __future__ import annotations import asyncio import ipaddress import json import logging import os import urllib.parse import urllib.request from derp.http import urlopen as _urlopen from derp.plugin import command log = logging.getLogger(__name__) _API_BASE = "https://api.abuseipdb.com/api/v2" _MAX_BATCH = 5 def _get_api_key(bot) -> str: """Resolve API key from environment or config.""" return (os.environ.get("ABUSEIPDB_API_KEY", "") or bot.config.get("abuseipdb", {}).get("api_key", "")) def _validate_ip(addr: str) -> str | None: """Validate IP address. Returns error string or None if valid.""" try: ip = ipaddress.ip_address(addr) except ValueError: return f"invalid IP: {addr}" if ip.is_private or ip.is_loopback: return f"{addr}: private/loopback address" return None def _check_ip(api_key: str, addr: str) -> dict: """Query AbuseIPDB check endpoint (blocking).""" url = f"{_API_BASE}/check?{urllib.parse.urlencode({'ipAddress': addr, 'maxAgeInDays': 90})}" req = urllib.request.Request(url, headers={ "Key": api_key, "Accept": "application/json", "User-Agent": "derp-bot", }) with _urlopen(req, timeout=15) as resp: return json.loads(resp.read()) def _report_ip(api_key: str, addr: str, categories: str, comment: str) -> dict: """Submit abuse report (blocking).""" data = urllib.parse.urlencode({ "ip": addr, "categories": categories, "comment": comment, }).encode() req = urllib.request.Request(f"{_API_BASE}/report", data=data, headers={ "Key": api_key, "Accept": "application/json", "User-Agent": "derp-bot", }) with _urlopen(req, timeout=15) as resp: return json.loads(resp.read()) def _format_check(addr: str, data: dict) -> str: """Format check response into single-line output.""" d = data.get("data", {}) score = d.get("abuseConfidenceScore", "?") reports = d.get("totalReports", 0) isp = d.get("isp", "?") usage = d.get("usageType", "?") country = d.get("countryCode", "?") return (f"{addr} -- Abuse: {score}% ({reports} reports) | ISP: {isp}" f" | Usage: {usage} | Country: {country}") @command("abuse", help="AbuseIPDB: !abuse [ip2 ...] | !abuse report ") async def cmd_abuse(bot, message): """Check or report IP addresses via AbuseIPDB. Usage: !abuse 8.8.8.8 Check single IP !abuse 8.8.8.8 1.1.1.1 Check multiple IPs (max 5) !abuse 8.8.8.8 report 14,22 Brute force Report IP (admin) """ parts = message.text.split() if len(parts) < 2: usage = "Usage: !abuse [ip2 ...] | !abuse report " await bot.reply(message, usage) return api_key = _get_api_key(bot) if not api_key: await bot.reply(message, "AbuseIPDB API key not configured") return # Detect report mode: !abuse report if len(parts) >= 5 and parts[2].lower() == "report": if not bot._is_admin(message): await bot.reply(message, "Permission denied: reporting requires admin") return addr = parts[1] err = _validate_ip(addr) if err: await bot.reply(message, err) return categories = parts[3] comment = " ".join(parts[4:]) loop = asyncio.get_running_loop() try: result = await loop.run_in_executor( None, _report_ip, api_key, addr, categories, comment, ) score = result.get("data", {}).get("abuseConfidenceScore", "?") await bot.reply(message, f"{addr} -- reported (confidence: {score}%)") except Exception as exc: await bot.reply(message, f"{addr} -- report failed: {exc}") return # Check mode: collect IPs from arguments addrs = parts[1:1 + _MAX_BATCH] # Validate all for addr in addrs: err = _validate_ip(addr) if err: await bot.reply(message, err) return loop = asyncio.get_running_loop() async def _query(addr: str) -> str: try: result = await loop.run_in_executor(None, _check_ip, api_key, addr) return _format_check(addr, result) except Exception as exc: return f"{addr} -- error: {exc}" results = await asyncio.gather(*[_query(a) for a in addrs]) await bot.long_reply(message, list(results), label="abuse check")