add tor connection pool with health monitoring

This commit is contained in:
Username
2025-12-20 23:02:21 +01:00
parent ce79ef7d7f
commit bc945a33ff

264
connection_pool.py Normal file
View File

@@ -0,0 +1,264 @@
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
"""Tor connection pooling for improved proxy testing throughput.
This module provides connection pooling to reduce the overhead of
establishing Tor circuits for each proxy test.
Key features:
- Worker-Tor affinity (consistent host assignment per thread)
- Connection pre-warming (establish circuits at startup)
- Health monitoring (detect and skip unresponsive Tor hosts)
- Usage statistics for monitoring
"""
import threading
import time
import random
import socket
try:
import Queue
except ImportError:
import queue as Queue
import rocksock
from misc import _log
class TorHostState(object):
"""Track state and health of a single Tor SOCKS host."""
def __init__(self, host_addr):
self.host_addr = host_addr
self.lock = threading.Lock()
# Health tracking
self.consecutive_failures = 0
self.last_success = 0
self.last_failure = 0
self.total_successes = 0
self.total_failures = 0
# Backoff
self.backoff_until = 0
# Latency tracking (rolling average)
self.latency_samples = []
self.max_samples = 20
def record_success(self, latency=0):
"""Record a successful connection through this host."""
with self.lock:
self.consecutive_failures = 0
self.last_success = time.time()
self.total_successes += 1
self.backoff_until = 0
if latency > 0:
self.latency_samples.append(latency)
if len(self.latency_samples) > self.max_samples:
self.latency_samples.pop(0)
def record_failure(self):
"""Record a failed connection through this host."""
with self.lock:
self.consecutive_failures += 1
self.last_failure = time.time()
self.total_failures += 1
# Exponential backoff: 5s, 10s, 20s, 40s, max 60s
delay = min(5 * (2 ** (self.consecutive_failures - 1)), 60)
self.backoff_until = time.time() + delay
def is_available(self):
"""Check if host is available (not in backoff)."""
with self.lock:
return time.time() >= self.backoff_until
def avg_latency(self):
"""Get average connection latency in seconds."""
with self.lock:
if not self.latency_samples:
return 0
return sum(self.latency_samples) / len(self.latency_samples)
def success_rate(self):
"""Get success rate as percentage."""
with self.lock:
total = self.total_successes + self.total_failures
if total == 0:
return 100.0
return (float(self.total_successes) / total) * 100
class TorConnectionPool(object):
"""Pool of Tor SOCKS connections with health monitoring.
Provides:
- Consistent worker-to-host assignment (circuit affinity)
- Health-based host selection
- Connection pre-warming
- Usage statistics
"""
def __init__(self, tor_hosts, warmup=True, warmup_target='www.google.com'):
"""Initialize the connection pool.
Args:
tor_hosts: List of Tor SOCKS addresses (host:port)
warmup: Whether to pre-warm connections at startup
warmup_target: Target host for warmup connections
"""
self.tor_hosts = tor_hosts
self.host_states = {}
self.lock = threading.Lock()
# Worker affinity mapping: thread_id -> tor_host
self.worker_assignments = {}
# Statistics
self.stats_lock = threading.Lock()
self.total_requests = 0
self.total_successes = 0
self.warmup_complete = False
# Initialize host states
for host in tor_hosts:
self.host_states[host] = TorHostState(host)
# Pre-warm connections
if warmup and len(tor_hosts) > 0:
self._warmup_connections(warmup_target)
def _warmup_connections(self, target_host, target_port=80, timeout=10):
"""Pre-warm Tor circuits by making test connections."""
_log('warming up %d tor host(s)...' % len(self.tor_hosts), 'pool')
warmed = 0
for host_addr in self.tor_hosts:
try:
start = time.time()
proxy = rocksock.RocksockProxyFromURL('socks5://%s' % host_addr)
sock = rocksock.Rocksock(
host=target_host,
port=target_port,
proxies=[proxy],
timeout=timeout
)
sock.connect()
sock.disconnect()
elapsed = time.time() - start
self.host_states[host_addr].record_success(elapsed)
warmed += 1
except Exception as e:
self.host_states[host_addr].record_failure()
_log('warmup failed for %s: %s' % (host_addr, str(e)), 'debug')
self.warmup_complete = True
_log('warmup complete: %d/%d hosts ready' % (warmed, len(self.tor_hosts)), 'pool')
def get_tor_host(self, worker_id=None):
"""Get a Tor host for a worker, with affinity and health-awareness.
Args:
worker_id: Optional worker identifier for consistent assignment
Returns:
Tor host address string, or None if all hosts unavailable
"""
with self.stats_lock:
self.total_requests += 1
# Check for existing assignment with affinity
if worker_id is not None:
with self.lock:
if worker_id in self.worker_assignments:
assigned = self.worker_assignments[worker_id]
if self.host_states[assigned].is_available():
return assigned
# Get available hosts sorted by success rate
available = []
for host_addr, state in self.host_states.items():
if state.is_available():
available.append((host_addr, state.success_rate(), state.avg_latency()))
if not available:
# All hosts in backoff, return random one anyway
return random.choice(self.tor_hosts) if self.tor_hosts else None
# Sort by success rate (desc), then latency (asc)
available.sort(key=lambda x: (-x[1], x[2]))
# Pick from top performers with some randomness
top_count = max(1, len(available) // 2)
chosen = random.choice(available[:top_count])[0]
# Record affinity
if worker_id is not None:
with self.lock:
self.worker_assignments[worker_id] = chosen
return chosen
def record_success(self, host_addr, latency=0):
"""Record successful use of a Tor host."""
if host_addr in self.host_states:
self.host_states[host_addr].record_success(latency)
with self.stats_lock:
self.total_successes += 1
def record_failure(self, host_addr):
"""Record failed use of a Tor host."""
if host_addr in self.host_states:
self.host_states[host_addr].record_failure()
def get_stats(self):
"""Get pool statistics."""
with self.stats_lock:
total = self.total_requests
successes = self.total_successes
available_count = sum(1 for s in self.host_states.values() if s.is_available())
host_stats = []
for host_addr, state in self.host_states.items():
host_stats.append({
'host': host_addr,
'available': state.is_available(),
'success_rate': state.success_rate(),
'avg_latency': state.avg_latency(),
'total_successes': state.total_successes,
'total_failures': state.total_failures,
})
return {
'total_requests': total,
'total_successes': successes,
'success_rate': (float(successes) / total * 100) if total > 0 else 0,
'available_hosts': available_count,
'total_hosts': len(self.tor_hosts),
'warmup_complete': self.warmup_complete,
'hosts': host_stats,
}
def status_line(self):
"""Get a compact status line for logging."""
stats = self.get_stats()
return 'tor pool: %d/%d hosts, %.1f%% success, %d requests' % (
stats['available_hosts'],
stats['total_hosts'],
stats['success_rate'],
stats['total_requests']
)
# Global pool instance (initialized by proxywatchd)
_pool = None
def init_pool(tor_hosts, warmup=True):
"""Initialize the global connection pool."""
global _pool
_pool = TorConnectionPool(tor_hosts, warmup=warmup)
return _pool
def get_pool():
"""Get the global connection pool."""
return _pool