From af5e1ce4b0412706bbdae26961ec9a92cfb3d766 Mon Sep 17 00:00:00 2001 From: Username Date: Sat, 20 Dec 2025 23:02:26 +0100 Subject: [PATCH] proxywatchd: integrate tor connection pool --- proxywatchd.py | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/proxywatchd.py b/proxywatchd.py index 5a7ed08..1c6f936 100644 --- a/proxywatchd.py +++ b/proxywatchd.py @@ -16,6 +16,7 @@ from config import Config import mysqlite from misc import _log, categorize_error import rocksock +import connection_pool config = Config() @@ -255,10 +256,11 @@ class TargetTestJob(): Multiple TargetTestJob instances share the same ProxyTestState, allowing tests to be interleaved with other proxies in the queue. """ - def __init__(self, proxy_state, target_srv, checktype): + def __init__(self, proxy_state, target_srv, checktype, worker_id=None): self.proxy_state = proxy_state self.target_srv = target_srv self.checktype = checktype + self.worker_id = worker_id def run(self): """Test the proxy against this job's target server.""" @@ -307,8 +309,14 @@ class TargetTestJob(): protos = ['http', 'socks5', 'socks4'] if ps.proto is None else [ps.proto] last_error_category = None + # Get Tor host from pool (with worker affinity) + pool = connection_pool.get_pool() + for proto in protos: - torhost = random.choice(config.torhosts) + if pool: + torhost = pool.get_tor_host(self.worker_id) + else: + torhost = random.choice(config.torhosts) if proto == 'socks4': srv = socks4_resolve(srvname, server_port) else: @@ -331,6 +339,9 @@ class TargetTestJob(): sock.send('NICK\n') else: sock.send('HEAD / HTTP/1.0\r\nHost: %s\r\n\r\n' % srvname) + # Record success in pool + if pool: + pool.record_success(torhost, time.time() - duration) return sock, proto, duration, torhost, srvname, 0, use_ssl, None except rocksock.RocksockException as e: @@ -350,6 +361,9 @@ class TargetTestJob(): err == rocksock.RS_E_HIT_TIMEOUT): break elif fp == 0 and err == rocksock.RS_E_TARGET_CONN_REFUSED: + # Tor connection failed - record in pool + if pool: + pool.record_failure(torhost) if random.randint(0, (config.watchd.threads - 1) / 2) == 0: _log("could not connect to tor, sleep 5s", "ERROR") time.sleep(5) @@ -388,6 +402,8 @@ class WorkerThread(): except Queue.Empty: continue nao = time.time() + # Assign worker ID for connection pool affinity + job.worker_id = self.id job.run() spent = time.time() - nao job_count += 1 @@ -626,6 +642,9 @@ class Proxywatchd(): def _run(self): _log('starting...', 'watchd') + # Initialize Tor connection pool + connection_pool.init_pool(config.torhosts, warmup=True) + # Start HTTP API server if enabled if config.httpd.enabled: from httpd import ProxyAPIServer @@ -682,6 +701,10 @@ class Proxywatchd(): # periodic stats report if self.stats.should_report(config.watchd.stats_interval): _log(self.stats.report(), 'stats') + # Also report pool stats + pool = connection_pool.get_pool() + if pool: + _log(pool.status_line(), 'stats') # periodic stale proxy cleanup (daily) if (time.time() - self.last_cleanup) >= 86400: