"""Plugin: check IPs against local Tor exit node list.""" from __future__ import annotations import ipaddress import logging import time from pathlib import Path from derp.plugin import command log = logging.getLogger(__name__) _EXIT_LIST_PATHS = [ Path("data/tor-exit-nodes.txt"), Path("/var/lib/tor/exit-nodes.txt"), ] _MAX_AGE = 86400 # Refresh if older than 24h _TOR_EXIT_URL = "https://check.torproject.org/torbulkexitlist" _exits: set[str] = set() _loaded_at: float = 0 def _load_exits() -> set[str]: """Load exit node IPs from local file.""" for path in _EXIT_LIST_PATHS: if path.is_file(): try: nodes = set() for line in path.read_text().splitlines(): line = line.strip() if not line or line.startswith("#"): continue try: ipaddress.ip_address(line) nodes.add(line) except ValueError: continue log.info("torcheck: loaded %d exit nodes from %s", len(nodes), path) return nodes except OSError: continue return set() def _refresh_if_stale() -> None: """Reload the exit list if it hasn't been loaded or is stale.""" global _exits, _loaded_at now = time.monotonic() if _exits and (now - _loaded_at) < _MAX_AGE: return nodes = _load_exits() if nodes: _exits = nodes _loaded_at = now async def _download_exits() -> int: """Download the Tor bulk exit list. Returns count of nodes fetched.""" import asyncio import urllib.request loop = asyncio.get_running_loop() def _fetch(): req = urllib.request.Request(_TOR_EXIT_URL, headers={"User-Agent": "derp-bot"}) with urllib.request.urlopen(req, timeout=30) as resp: # noqa: S310 return resp.read().decode("utf-8", errors="replace") try: text = await loop.run_in_executor(None, _fetch) except Exception as exc: log.error("torcheck: download failed: %s", exc) return -1 nodes = set() for line in text.splitlines(): line = line.strip() if not line or line.startswith("#"): continue try: ipaddress.ip_address(line) nodes.add(line) except ValueError: continue if not nodes: return 0 # Write to first candidate path dest = _EXIT_LIST_PATHS[0] dest.parent.mkdir(parents=True, exist_ok=True) dest.write_text("\n".join(sorted(nodes)) + "\n") log.info("torcheck: saved %d exit nodes to %s", len(nodes), dest) global _exits, _loaded_at _exits = nodes _loaded_at = time.monotonic() return len(nodes) @command("tor", help="Tor exit check: !tor ") async def cmd_tor(bot, message): """Check if an IP is a known Tor exit node. Usage: !tor 1.2.3.4 Check IP against exit list !tor update Download latest exit node list """ parts = message.text.split(None, 2) if len(parts) < 2: await bot.reply(message, "Usage: !tor ") return arg = parts[1].strip() if arg == "update": await bot.reply(message, "Downloading Tor exit list...") count = await _download_exits() if count < 0: await bot.reply(message, "Failed to download exit list") elif count == 0: await bot.reply(message, "Downloaded empty list") else: await bot.reply(message, f"Updated: {count} exit nodes") return try: ip = ipaddress.ip_address(arg) except ValueError: await bot.reply(message, f"Invalid IP address: {arg}") return _refresh_if_stale() if not _exits: await bot.reply(message, "No exit list loaded (run !tor update)") return addr = str(ip) if addr in _exits: await bot.reply(message, f"{addr}: Tor exit node ({len(_exits)} nodes in list)") else: await bot.reply(message, f"{addr}: not a known Tor exit ({len(_exits)} nodes in list)")