Files
derp/plugins/torcheck.py
user 97bbc6a825 feat: route plugin HTTP traffic through SOCKS5 proxy
Add PySocks dependency and shared src/derp/http.py module providing
proxy-aware urlopen() and build_opener() that route through
socks5h://127.0.0.1:1080. Subclassed SocksiPyHandler passes SSL
context through to HTTPS connections.

Swapped 14 external-facing plugins to use the proxied helpers.
Local-only traffic (SearXNG, raw DNS/TLS sockets) stays direct.
Updated test mocks in test_twitch and test_alert accordingly.
2026-02-15 15:53:49 +01:00

147 lines
4.1 KiB
Python

"""Plugin: check IPs against local Tor exit node list."""
from __future__ import annotations
import ipaddress
import logging
import time
from pathlib import Path
from derp.http import urlopen as _urlopen
from derp.plugin import command
log = logging.getLogger(__name__)
_EXIT_LIST_PATHS = [
Path("data/tor-exit-nodes.txt"),
Path("/var/lib/tor/exit-nodes.txt"),
]
_MAX_AGE = 86400 # Refresh if older than 24h
_TOR_EXIT_URL = "https://check.torproject.org/torbulkexitlist"
_exits: set[str] = set()
_loaded_at: float = 0
def _load_exits() -> set[str]:
"""Load exit node IPs from local file."""
for path in _EXIT_LIST_PATHS:
if path.is_file():
try:
nodes = set()
for line in path.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
try:
ipaddress.ip_address(line)
nodes.add(line)
except ValueError:
continue
log.info("torcheck: loaded %d exit nodes from %s", len(nodes), path)
return nodes
except OSError:
continue
return set()
def _refresh_if_stale() -> None:
"""Reload the exit list if it hasn't been loaded or is stale."""
global _exits, _loaded_at
now = time.monotonic()
if _exits and (now - _loaded_at) < _MAX_AGE:
return
nodes = _load_exits()
if nodes:
_exits = nodes
_loaded_at = now
async def _download_exits() -> int:
"""Download the Tor bulk exit list. Returns count of nodes fetched."""
import asyncio
import urllib.request
loop = asyncio.get_running_loop()
def _fetch():
req = urllib.request.Request(_TOR_EXIT_URL, headers={"User-Agent": "derp-bot"})
with _urlopen(req, timeout=30) as resp:
return resp.read().decode("utf-8", errors="replace")
try:
text = await loop.run_in_executor(None, _fetch)
except Exception as exc:
log.error("torcheck: download failed: %s", exc)
return -1
nodes = set()
for line in text.splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
try:
ipaddress.ip_address(line)
nodes.add(line)
except ValueError:
continue
if not nodes:
return 0
# Write to first candidate path
dest = _EXIT_LIST_PATHS[0]
dest.parent.mkdir(parents=True, exist_ok=True)
dest.write_text("\n".join(sorted(nodes)) + "\n")
log.info("torcheck: saved %d exit nodes to %s", len(nodes), dest)
global _exits, _loaded_at
_exits = nodes
_loaded_at = time.monotonic()
return len(nodes)
@command("tor", help="Tor exit check: !tor <ip|update>")
async def cmd_tor(bot, message):
"""Check if an IP is a known Tor exit node.
Usage:
!tor 1.2.3.4 Check IP against exit list
!tor update Download latest exit node list
"""
parts = message.text.split(None, 2)
if len(parts) < 2:
await bot.reply(message, "Usage: !tor <ip|update>")
return
arg = parts[1].strip()
if arg == "update":
await bot.reply(message, "Downloading Tor exit list...")
count = await _download_exits()
if count < 0:
await bot.reply(message, "Failed to download exit list")
elif count == 0:
await bot.reply(message, "Downloaded empty list")
else:
await bot.reply(message, f"Updated: {count} exit nodes")
return
try:
ip = ipaddress.ip_address(arg)
except ValueError:
await bot.reply(message, f"Invalid IP address: {arg}")
return
_refresh_if_stale()
if not _exits:
await bot.reply(message, "No exit list loaded (run !tor update)")
return
addr = str(ip)
if addr in _exits:
await bot.reply(message, f"{addr}: Tor exit node ({len(_exits)} nodes in list)")
else:
await bot.reply(message, f"{addr}: not a known Tor exit ({len(_exits)} nodes in list)")