proxywatchd: integrate tor connection pool
This commit is contained in:
@@ -16,6 +16,7 @@ from config import Config
|
|||||||
import mysqlite
|
import mysqlite
|
||||||
from misc import _log, categorize_error
|
from misc import _log, categorize_error
|
||||||
import rocksock
|
import rocksock
|
||||||
|
import connection_pool
|
||||||
|
|
||||||
config = Config()
|
config = Config()
|
||||||
|
|
||||||
@@ -255,10 +256,11 @@ class TargetTestJob():
|
|||||||
Multiple TargetTestJob instances share the same ProxyTestState,
|
Multiple TargetTestJob instances share the same ProxyTestState,
|
||||||
allowing tests to be interleaved with other proxies in the queue.
|
allowing tests to be interleaved with other proxies in the queue.
|
||||||
"""
|
"""
|
||||||
def __init__(self, proxy_state, target_srv, checktype):
|
def __init__(self, proxy_state, target_srv, checktype, worker_id=None):
|
||||||
self.proxy_state = proxy_state
|
self.proxy_state = proxy_state
|
||||||
self.target_srv = target_srv
|
self.target_srv = target_srv
|
||||||
self.checktype = checktype
|
self.checktype = checktype
|
||||||
|
self.worker_id = worker_id
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
"""Test the proxy against this job's target server."""
|
"""Test the proxy against this job's target server."""
|
||||||
@@ -307,8 +309,14 @@ class TargetTestJob():
|
|||||||
protos = ['http', 'socks5', 'socks4'] if ps.proto is None else [ps.proto]
|
protos = ['http', 'socks5', 'socks4'] if ps.proto is None else [ps.proto]
|
||||||
last_error_category = None
|
last_error_category = None
|
||||||
|
|
||||||
|
# Get Tor host from pool (with worker affinity)
|
||||||
|
pool = connection_pool.get_pool()
|
||||||
|
|
||||||
for proto in protos:
|
for proto in protos:
|
||||||
torhost = random.choice(config.torhosts)
|
if pool:
|
||||||
|
torhost = pool.get_tor_host(self.worker_id)
|
||||||
|
else:
|
||||||
|
torhost = random.choice(config.torhosts)
|
||||||
if proto == 'socks4':
|
if proto == 'socks4':
|
||||||
srv = socks4_resolve(srvname, server_port)
|
srv = socks4_resolve(srvname, server_port)
|
||||||
else:
|
else:
|
||||||
@@ -331,6 +339,9 @@ class TargetTestJob():
|
|||||||
sock.send('NICK\n')
|
sock.send('NICK\n')
|
||||||
else:
|
else:
|
||||||
sock.send('HEAD / HTTP/1.0\r\nHost: %s\r\n\r\n' % srvname)
|
sock.send('HEAD / HTTP/1.0\r\nHost: %s\r\n\r\n' % srvname)
|
||||||
|
# Record success in pool
|
||||||
|
if pool:
|
||||||
|
pool.record_success(torhost, time.time() - duration)
|
||||||
return sock, proto, duration, torhost, srvname, 0, use_ssl, None
|
return sock, proto, duration, torhost, srvname, 0, use_ssl, None
|
||||||
|
|
||||||
except rocksock.RocksockException as e:
|
except rocksock.RocksockException as e:
|
||||||
@@ -350,6 +361,9 @@ class TargetTestJob():
|
|||||||
err == rocksock.RS_E_HIT_TIMEOUT):
|
err == rocksock.RS_E_HIT_TIMEOUT):
|
||||||
break
|
break
|
||||||
elif fp == 0 and err == rocksock.RS_E_TARGET_CONN_REFUSED:
|
elif fp == 0 and err == rocksock.RS_E_TARGET_CONN_REFUSED:
|
||||||
|
# Tor connection failed - record in pool
|
||||||
|
if pool:
|
||||||
|
pool.record_failure(torhost)
|
||||||
if random.randint(0, (config.watchd.threads - 1) / 2) == 0:
|
if random.randint(0, (config.watchd.threads - 1) / 2) == 0:
|
||||||
_log("could not connect to tor, sleep 5s", "ERROR")
|
_log("could not connect to tor, sleep 5s", "ERROR")
|
||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
@@ -388,6 +402,8 @@ class WorkerThread():
|
|||||||
except Queue.Empty:
|
except Queue.Empty:
|
||||||
continue
|
continue
|
||||||
nao = time.time()
|
nao = time.time()
|
||||||
|
# Assign worker ID for connection pool affinity
|
||||||
|
job.worker_id = self.id
|
||||||
job.run()
|
job.run()
|
||||||
spent = time.time() - nao
|
spent = time.time() - nao
|
||||||
job_count += 1
|
job_count += 1
|
||||||
@@ -626,6 +642,9 @@ class Proxywatchd():
|
|||||||
def _run(self):
|
def _run(self):
|
||||||
_log('starting...', 'watchd')
|
_log('starting...', 'watchd')
|
||||||
|
|
||||||
|
# Initialize Tor connection pool
|
||||||
|
connection_pool.init_pool(config.torhosts, warmup=True)
|
||||||
|
|
||||||
# Start HTTP API server if enabled
|
# Start HTTP API server if enabled
|
||||||
if config.httpd.enabled:
|
if config.httpd.enabled:
|
||||||
from httpd import ProxyAPIServer
|
from httpd import ProxyAPIServer
|
||||||
@@ -682,6 +701,10 @@ class Proxywatchd():
|
|||||||
# periodic stats report
|
# periodic stats report
|
||||||
if self.stats.should_report(config.watchd.stats_interval):
|
if self.stats.should_report(config.watchd.stats_interval):
|
||||||
_log(self.stats.report(), 'stats')
|
_log(self.stats.report(), 'stats')
|
||||||
|
# Also report pool stats
|
||||||
|
pool = connection_pool.get_pool()
|
||||||
|
if pool:
|
||||||
|
_log(pool.status_line(), 'stats')
|
||||||
|
|
||||||
# periodic stale proxy cleanup (daily)
|
# periodic stale proxy cleanup (daily)
|
||||||
if (time.time() - self.last_cleanup) >= 86400:
|
if (time.time() - self.last_cleanup) >= 86400:
|
||||||
|
|||||||
Reference in New Issue
Block a user