diff --git a/connection_pool.py b/connection_pool.py new file mode 100644 index 0000000..1af8b21 --- /dev/null +++ b/connection_pool.py @@ -0,0 +1,264 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- +"""Tor connection pooling for improved proxy testing throughput. + +This module provides connection pooling to reduce the overhead of +establishing Tor circuits for each proxy test. + +Key features: +- Worker-Tor affinity (consistent host assignment per thread) +- Connection pre-warming (establish circuits at startup) +- Health monitoring (detect and skip unresponsive Tor hosts) +- Usage statistics for monitoring +""" + +import threading +import time +import random +import socket + +try: + import Queue +except ImportError: + import queue as Queue + +import rocksock +from misc import _log + + +class TorHostState(object): + """Track state and health of a single Tor SOCKS host.""" + + def __init__(self, host_addr): + self.host_addr = host_addr + self.lock = threading.Lock() + # Health tracking + self.consecutive_failures = 0 + self.last_success = 0 + self.last_failure = 0 + self.total_successes = 0 + self.total_failures = 0 + # Backoff + self.backoff_until = 0 + # Latency tracking (rolling average) + self.latency_samples = [] + self.max_samples = 20 + + def record_success(self, latency=0): + """Record a successful connection through this host.""" + with self.lock: + self.consecutive_failures = 0 + self.last_success = time.time() + self.total_successes += 1 + self.backoff_until = 0 + if latency > 0: + self.latency_samples.append(latency) + if len(self.latency_samples) > self.max_samples: + self.latency_samples.pop(0) + + def record_failure(self): + """Record a failed connection through this host.""" + with self.lock: + self.consecutive_failures += 1 + self.last_failure = time.time() + self.total_failures += 1 + # Exponential backoff: 5s, 10s, 20s, 40s, max 60s + delay = min(5 * (2 ** (self.consecutive_failures - 1)), 60) + self.backoff_until = time.time() + delay + + def is_available(self): + """Check if host is available (not in backoff).""" + with self.lock: + return time.time() >= self.backoff_until + + def avg_latency(self): + """Get average connection latency in seconds.""" + with self.lock: + if not self.latency_samples: + return 0 + return sum(self.latency_samples) / len(self.latency_samples) + + def success_rate(self): + """Get success rate as percentage.""" + with self.lock: + total = self.total_successes + self.total_failures + if total == 0: + return 100.0 + return (float(self.total_successes) / total) * 100 + + +class TorConnectionPool(object): + """Pool of Tor SOCKS connections with health monitoring. + + Provides: + - Consistent worker-to-host assignment (circuit affinity) + - Health-based host selection + - Connection pre-warming + - Usage statistics + """ + + def __init__(self, tor_hosts, warmup=True, warmup_target='www.google.com'): + """Initialize the connection pool. + + Args: + tor_hosts: List of Tor SOCKS addresses (host:port) + warmup: Whether to pre-warm connections at startup + warmup_target: Target host for warmup connections + """ + self.tor_hosts = tor_hosts + self.host_states = {} + self.lock = threading.Lock() + + # Worker affinity mapping: thread_id -> tor_host + self.worker_assignments = {} + + # Statistics + self.stats_lock = threading.Lock() + self.total_requests = 0 + self.total_successes = 0 + self.warmup_complete = False + + # Initialize host states + for host in tor_hosts: + self.host_states[host] = TorHostState(host) + + # Pre-warm connections + if warmup and len(tor_hosts) > 0: + self._warmup_connections(warmup_target) + + def _warmup_connections(self, target_host, target_port=80, timeout=10): + """Pre-warm Tor circuits by making test connections.""" + _log('warming up %d tor host(s)...' % len(self.tor_hosts), 'pool') + warmed = 0 + + for host_addr in self.tor_hosts: + try: + start = time.time() + proxy = rocksock.RocksockProxyFromURL('socks5://%s' % host_addr) + sock = rocksock.Rocksock( + host=target_host, + port=target_port, + proxies=[proxy], + timeout=timeout + ) + sock.connect() + sock.disconnect() + elapsed = time.time() - start + self.host_states[host_addr].record_success(elapsed) + warmed += 1 + except Exception as e: + self.host_states[host_addr].record_failure() + _log('warmup failed for %s: %s' % (host_addr, str(e)), 'debug') + + self.warmup_complete = True + _log('warmup complete: %d/%d hosts ready' % (warmed, len(self.tor_hosts)), 'pool') + + def get_tor_host(self, worker_id=None): + """Get a Tor host for a worker, with affinity and health-awareness. + + Args: + worker_id: Optional worker identifier for consistent assignment + + Returns: + Tor host address string, or None if all hosts unavailable + """ + with self.stats_lock: + self.total_requests += 1 + + # Check for existing assignment with affinity + if worker_id is not None: + with self.lock: + if worker_id in self.worker_assignments: + assigned = self.worker_assignments[worker_id] + if self.host_states[assigned].is_available(): + return assigned + + # Get available hosts sorted by success rate + available = [] + for host_addr, state in self.host_states.items(): + if state.is_available(): + available.append((host_addr, state.success_rate(), state.avg_latency())) + + if not available: + # All hosts in backoff, return random one anyway + return random.choice(self.tor_hosts) if self.tor_hosts else None + + # Sort by success rate (desc), then latency (asc) + available.sort(key=lambda x: (-x[1], x[2])) + + # Pick from top performers with some randomness + top_count = max(1, len(available) // 2) + chosen = random.choice(available[:top_count])[0] + + # Record affinity + if worker_id is not None: + with self.lock: + self.worker_assignments[worker_id] = chosen + + return chosen + + def record_success(self, host_addr, latency=0): + """Record successful use of a Tor host.""" + if host_addr in self.host_states: + self.host_states[host_addr].record_success(latency) + with self.stats_lock: + self.total_successes += 1 + + def record_failure(self, host_addr): + """Record failed use of a Tor host.""" + if host_addr in self.host_states: + self.host_states[host_addr].record_failure() + + def get_stats(self): + """Get pool statistics.""" + with self.stats_lock: + total = self.total_requests + successes = self.total_successes + + available_count = sum(1 for s in self.host_states.values() if s.is_available()) + host_stats = [] + for host_addr, state in self.host_states.items(): + host_stats.append({ + 'host': host_addr, + 'available': state.is_available(), + 'success_rate': state.success_rate(), + 'avg_latency': state.avg_latency(), + 'total_successes': state.total_successes, + 'total_failures': state.total_failures, + }) + + return { + 'total_requests': total, + 'total_successes': successes, + 'success_rate': (float(successes) / total * 100) if total > 0 else 0, + 'available_hosts': available_count, + 'total_hosts': len(self.tor_hosts), + 'warmup_complete': self.warmup_complete, + 'hosts': host_stats, + } + + def status_line(self): + """Get a compact status line for logging.""" + stats = self.get_stats() + return 'tor pool: %d/%d hosts, %.1f%% success, %d requests' % ( + stats['available_hosts'], + stats['total_hosts'], + stats['success_rate'], + stats['total_requests'] + ) + + +# Global pool instance (initialized by proxywatchd) +_pool = None + + +def init_pool(tor_hosts, warmup=True): + """Initialize the global connection pool.""" + global _pool + _pool = TorConnectionPool(tor_hosts, warmup=warmup) + return _pool + + +def get_pool(): + """Get the global connection pool.""" + return _pool