Add PySocks dependency and shared src/derp/http.py module providing proxy-aware urlopen() and build_opener() that route through socks5h://127.0.0.1:1080. Subclassed SocksiPyHandler passes SSL context through to HTTPS connections. Swapped 14 external-facing plugins to use the proxied helpers. Local-only traffic (SearXNG, raw DNS/TLS sockets) stays direct. Updated test mocks in test_twitch and test_alert accordingly.
147 lines
4.1 KiB
Python
147 lines
4.1 KiB
Python
"""Plugin: check IPs against local Tor exit node list."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import ipaddress
|
|
import logging
|
|
import time
|
|
from pathlib import Path
|
|
|
|
from derp.http import urlopen as _urlopen
|
|
from derp.plugin import command
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
_EXIT_LIST_PATHS = [
|
|
Path("data/tor-exit-nodes.txt"),
|
|
Path("/var/lib/tor/exit-nodes.txt"),
|
|
]
|
|
|
|
_MAX_AGE = 86400 # Refresh if older than 24h
|
|
_TOR_EXIT_URL = "https://check.torproject.org/torbulkexitlist"
|
|
|
|
_exits: set[str] = set()
|
|
_loaded_at: float = 0
|
|
|
|
|
|
def _load_exits() -> set[str]:
|
|
"""Load exit node IPs from local file."""
|
|
for path in _EXIT_LIST_PATHS:
|
|
if path.is_file():
|
|
try:
|
|
nodes = set()
|
|
for line in path.read_text().splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
try:
|
|
ipaddress.ip_address(line)
|
|
nodes.add(line)
|
|
except ValueError:
|
|
continue
|
|
log.info("torcheck: loaded %d exit nodes from %s", len(nodes), path)
|
|
return nodes
|
|
except OSError:
|
|
continue
|
|
return set()
|
|
|
|
|
|
def _refresh_if_stale() -> None:
|
|
"""Reload the exit list if it hasn't been loaded or is stale."""
|
|
global _exits, _loaded_at
|
|
now = time.monotonic()
|
|
if _exits and (now - _loaded_at) < _MAX_AGE:
|
|
return
|
|
nodes = _load_exits()
|
|
if nodes:
|
|
_exits = nodes
|
|
_loaded_at = now
|
|
|
|
|
|
async def _download_exits() -> int:
|
|
"""Download the Tor bulk exit list. Returns count of nodes fetched."""
|
|
import asyncio
|
|
import urllib.request
|
|
|
|
loop = asyncio.get_running_loop()
|
|
|
|
def _fetch():
|
|
req = urllib.request.Request(_TOR_EXIT_URL, headers={"User-Agent": "derp-bot"})
|
|
with _urlopen(req, timeout=30) as resp:
|
|
return resp.read().decode("utf-8", errors="replace")
|
|
|
|
try:
|
|
text = await loop.run_in_executor(None, _fetch)
|
|
except Exception as exc:
|
|
log.error("torcheck: download failed: %s", exc)
|
|
return -1
|
|
|
|
nodes = set()
|
|
for line in text.splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
try:
|
|
ipaddress.ip_address(line)
|
|
nodes.add(line)
|
|
except ValueError:
|
|
continue
|
|
|
|
if not nodes:
|
|
return 0
|
|
|
|
# Write to first candidate path
|
|
dest = _EXIT_LIST_PATHS[0]
|
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
|
dest.write_text("\n".join(sorted(nodes)) + "\n")
|
|
log.info("torcheck: saved %d exit nodes to %s", len(nodes), dest)
|
|
|
|
global _exits, _loaded_at
|
|
_exits = nodes
|
|
_loaded_at = time.monotonic()
|
|
return len(nodes)
|
|
|
|
|
|
@command("tor", help="Tor exit check: !tor <ip|update>")
|
|
async def cmd_tor(bot, message):
|
|
"""Check if an IP is a known Tor exit node.
|
|
|
|
Usage:
|
|
!tor 1.2.3.4 Check IP against exit list
|
|
!tor update Download latest exit node list
|
|
"""
|
|
parts = message.text.split(None, 2)
|
|
if len(parts) < 2:
|
|
await bot.reply(message, "Usage: !tor <ip|update>")
|
|
return
|
|
|
|
arg = parts[1].strip()
|
|
|
|
if arg == "update":
|
|
await bot.reply(message, "Downloading Tor exit list...")
|
|
count = await _download_exits()
|
|
if count < 0:
|
|
await bot.reply(message, "Failed to download exit list")
|
|
elif count == 0:
|
|
await bot.reply(message, "Downloaded empty list")
|
|
else:
|
|
await bot.reply(message, f"Updated: {count} exit nodes")
|
|
return
|
|
|
|
try:
|
|
ip = ipaddress.ip_address(arg)
|
|
except ValueError:
|
|
await bot.reply(message, f"Invalid IP address: {arg}")
|
|
return
|
|
|
|
_refresh_if_stale()
|
|
if not _exits:
|
|
await bot.reply(message, "No exit list loaded (run !tor update)")
|
|
return
|
|
|
|
addr = str(ip)
|
|
if addr in _exits:
|
|
await bot.reply(message, f"{addr}: Tor exit node ({len(_exits)} nodes in list)")
|
|
else:
|
|
await bot.reply(message, f"{addr}: not a known Tor exit ({len(_exits)} nodes in list)")
|