proxywatchd: add ssl_only mode and schedule improvements
- ssl_only mode: skip secondary check when SSL handshake fails - _build_due_sql(): unified query for proxies due testing - working_checktime/fail_retry_interval: new schedule formula - fail_retry_backoff: linear backoff option for failing proxies
This commit is contained in:
280
proxywatchd.py
280
proxywatchd.py
@@ -96,6 +96,41 @@ def _sample_dbg(msg, proxy=None, force=False):
|
||||
prefix = '[SAMPLE %s] ' % proxy if proxy else '[SAMPLE] '
|
||||
_log('%s%s' % (prefix, msg), 'diag')
|
||||
|
||||
|
||||
def _build_due_sql():
|
||||
"""Build SQL WHERE clause for proxies due for testing.
|
||||
|
||||
Returns (sql_condition, params) using new schedule formula:
|
||||
- failed=0: tested + working_checktime < now
|
||||
- failed>0 with backoff: tested + (failed * fail_retry_interval) < now
|
||||
- failed>0 without backoff: tested + fail_retry_interval < now
|
||||
"""
|
||||
now = time.time()
|
||||
if config.watchd.fail_retry_backoff:
|
||||
# Linear backoff: multiply interval by failure count
|
||||
sql = '''failed >= 0 AND failed < ? AND (
|
||||
tested IS NULL OR
|
||||
CASE WHEN failed = 0
|
||||
THEN tested + ? < ?
|
||||
ELSE tested + (failed * ?) < ?
|
||||
END
|
||||
)'''
|
||||
params = (config.watchd.max_fail, config.watchd.working_checktime, now,
|
||||
config.watchd.fail_retry_interval, now)
|
||||
else:
|
||||
# Fixed interval: same delay regardless of failure count
|
||||
sql = '''failed >= 0 AND failed < ? AND (
|
||||
tested IS NULL OR
|
||||
CASE WHEN failed = 0
|
||||
THEN tested + ? < ?
|
||||
ELSE tested + ? < ?
|
||||
END
|
||||
)'''
|
||||
params = (config.watchd.max_fail, config.watchd.working_checktime, now,
|
||||
config.watchd.fail_retry_interval, now)
|
||||
return sql, params
|
||||
|
||||
|
||||
# IP pattern for judge services (validates response contains valid IP in body)
|
||||
IP_PATTERN = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}'
|
||||
|
||||
@@ -588,7 +623,11 @@ class TargetTestJob(object):
|
||||
result = self._try_ssl_handshake(protos, pool)
|
||||
if result is not None:
|
||||
return result # SSL succeeded or MITM detected
|
||||
# SSL failed for all protocols, continue to secondary check
|
||||
# SSL failed for all protocols
|
||||
if config.watchd.ssl_only:
|
||||
# ssl_only mode: skip secondary check, mark as failed
|
||||
_dbg('SSL failed, ssl_only mode, skipping secondary check', ps.proxy)
|
||||
return (None, None, 0, None, None, 1, 0, 'ssl_only')
|
||||
_dbg('SSL failed, trying secondary check: %s' % self.checktype, ps.proxy)
|
||||
|
||||
# Phase 2: Secondary check (configured checktype)
|
||||
@@ -852,6 +891,174 @@ class WorkerThread():
|
||||
avg_t = try_div(duration_total, job_count)
|
||||
_log("terminated, %d jobs, avg.time %.2f" % (job_count, avg_t), self.id)
|
||||
|
||||
|
||||
class VerificationThread(threading.Thread):
|
||||
"""Manager thread that verifies disputed/suspicious proxy results.
|
||||
|
||||
Pulls from verification_queue, tests proxies directly via Tor,
|
||||
records authoritative results, and updates worker trust scores.
|
||||
"""
|
||||
|
||||
def __init__(self, database, interval=30, batch_size=10):
|
||||
threading.Thread.__init__(self)
|
||||
self.database = database
|
||||
self.interval = interval
|
||||
self.batch_size = batch_size
|
||||
self.daemon = True
|
||||
self._stop = threading.Event()
|
||||
self.verified_count = 0
|
||||
self.disagreements_resolved = 0
|
||||
|
||||
def stop(self):
|
||||
self._stop.set()
|
||||
|
||||
def run(self):
|
||||
_log('verification thread started (interval=%ds, batch=%d)' % (
|
||||
self.interval, self.batch_size), 'verify')
|
||||
|
||||
while not self._stop.is_set():
|
||||
try:
|
||||
self._verification_cycle()
|
||||
except Exception as e:
|
||||
_log('verification cycle error: %s' % e, 'error')
|
||||
|
||||
# Wait for next cycle
|
||||
self._stop.wait(self.interval)
|
||||
|
||||
_log('verification thread stopped (verified=%d, resolved=%d)' % (
|
||||
self.verified_count, self.disagreements_resolved), 'verify')
|
||||
|
||||
def _verification_cycle(self):
|
||||
"""Process pending verifications."""
|
||||
db = mysqlite.mysqlite(self.database, str)
|
||||
try:
|
||||
pending = dbs.get_pending_verifications(db, self.batch_size)
|
||||
if not pending:
|
||||
return
|
||||
|
||||
for item in pending:
|
||||
if self._stop.is_set():
|
||||
break
|
||||
|
||||
proxy = item['proxy']
|
||||
trigger = item['trigger']
|
||||
|
||||
# Test proxy directly
|
||||
result = self._test_proxy(proxy)
|
||||
|
||||
if result is None:
|
||||
# Test failed/inconclusive, skip for now
|
||||
continue
|
||||
|
||||
# Record verification result
|
||||
self._record_verification(db, item, result)
|
||||
self.verified_count += 1
|
||||
|
||||
if trigger == 'disagreement':
|
||||
self.disagreements_resolved += 1
|
||||
|
||||
db.commit()
|
||||
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
def _test_proxy(self, proxy):
|
||||
"""Test proxy directly using local Tor connection.
|
||||
|
||||
Returns:
|
||||
True if proxy works, False if fails, None if inconclusive
|
||||
"""
|
||||
parts = proxy.split(':')
|
||||
if len(parts) != 2:
|
||||
return None
|
||||
|
||||
ip, port_str = parts
|
||||
try:
|
||||
port = int(port_str)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
# Try SSL handshake as primary test
|
||||
pool = connection_pool.get_pool()
|
||||
if not pool:
|
||||
return None
|
||||
|
||||
proto_map = {
|
||||
'http': rocksock.RS_PT_HTTP,
|
||||
'socks4': rocksock.RS_PT_SOCKS4,
|
||||
'socks5': rocksock.RS_PT_SOCKS5
|
||||
}
|
||||
protos = ['http', 'socks5', 'socks4']
|
||||
|
||||
for proto in protos:
|
||||
try:
|
||||
sock = rocksock.Rocksock(
|
||||
host='www.google.com',
|
||||
port=443,
|
||||
ssl=True,
|
||||
timeout=config.common.timeout_connect,
|
||||
proxies=[
|
||||
(tor_proxy_url(pool.get_host(), proto='socks5'), rocksock.RS_PT_SOCKS5),
|
||||
(proxy, proto_map.get(proto, rocksock.RS_PT_HTTP))
|
||||
]
|
||||
)
|
||||
sock.connect()
|
||||
sock.disconnect()
|
||||
return True
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
return False
|
||||
|
||||
def _record_verification(self, db, item, result):
|
||||
"""Record verification result and update worker trust."""
|
||||
proxy = item['proxy']
|
||||
trigger = item['trigger']
|
||||
worker_a = item.get('worker_a')
|
||||
worker_b = item.get('worker_b')
|
||||
result_a = item.get('result_a')
|
||||
result_b = item.get('result_b')
|
||||
|
||||
# Update worker trust based on who was correct
|
||||
if trigger == 'disagreement' and worker_a and worker_b:
|
||||
# Check which worker(s) agreed with verification
|
||||
result_int = 1 if result else 0
|
||||
|
||||
if result_a == result_int:
|
||||
dbs.update_worker_trust(db, worker_a, True)
|
||||
else:
|
||||
dbs.update_worker_trust(db, worker_a, False)
|
||||
|
||||
if result_b == result_int:
|
||||
dbs.update_worker_trust(db, worker_b, True)
|
||||
else:
|
||||
dbs.update_worker_trust(db, worker_b, False)
|
||||
|
||||
elif worker_a:
|
||||
# Single worker trigger (resurrection/sudden_death)
|
||||
result_int = 1 if result else 0
|
||||
was_correct = (result_a == result_int)
|
||||
dbs.update_worker_trust(db, worker_a, was_correct)
|
||||
|
||||
# Update proxy status with authoritative result
|
||||
if result:
|
||||
db.execute('''
|
||||
UPDATE proxylist SET failed = 0, tested = ?
|
||||
WHERE proxy = ?
|
||||
''', (int(time.time()), proxy))
|
||||
else:
|
||||
db.execute('''
|
||||
UPDATE proxylist SET failed = failed + 1, tested = ?
|
||||
WHERE proxy = ?
|
||||
''', (int(time.time()), proxy))
|
||||
|
||||
# Remove from verification queue
|
||||
dbs.remove_from_verification_queue(db, proxy)
|
||||
|
||||
_log('verified %s: %s (trigger=%s)' % (
|
||||
proxy, 'PASS' if result else 'FAIL', trigger), 'verify')
|
||||
|
||||
|
||||
class Proxywatchd():
|
||||
|
||||
def stop(self):
|
||||
@@ -867,6 +1074,8 @@ class Proxywatchd():
|
||||
self.submit_collected()
|
||||
if self.httpd_server:
|
||||
self.httpd_server.stop()
|
||||
if self.verification_thread:
|
||||
self.verification_thread.stop()
|
||||
self.stopped.set()
|
||||
|
||||
def finish(self):
|
||||
@@ -929,6 +1138,7 @@ class Proxywatchd():
|
||||
self.stats = Stats()
|
||||
self.last_cleanup = time.time()
|
||||
self.httpd_server = None
|
||||
self.verification_thread = None
|
||||
|
||||
# Dynamic thread scaling
|
||||
min_t = config.watchd.min_threads if config.watchd.min_threads > 0 else max(5, config.watchd.threads // 4)
|
||||
@@ -990,9 +1200,10 @@ class Proxywatchd():
|
||||
_log('checktype changed (%s -> %s), requires restart to take effect' % (
|
||||
','.join(old_checktypes), ','.join(config.watchd.checktypes)), 'warn')
|
||||
|
||||
_log('config reloaded: threads=%d timeout=%d checktime=%d max_fail=%d' % (
|
||||
_log('config reloaded: threads=%d timeout=%d working=%ds fail_interval=%ds backoff=%s' % (
|
||||
config.watchd.threads, config.watchd.timeout,
|
||||
config.watchd.checktime, config.watchd.max_fail), 'watchd')
|
||||
config.watchd.working_checktime, config.watchd.fail_retry_interval,
|
||||
config.watchd.fail_retry_backoff), 'watchd')
|
||||
return True
|
||||
|
||||
def _spawn_thread(self):
|
||||
@@ -1036,21 +1247,35 @@ class Proxywatchd():
|
||||
def fetch_rows(self, db):
|
||||
"""Fetch proxy rows due for testing from database."""
|
||||
self.isoldies = False
|
||||
q = 'SELECT ip,port,proto,failed,success_count,total_duration,country,mitm,consecutive_success,asn,proxy FROM proxylist WHERE failed >= ? and failed < ? and (tested + ? + (failed * ?)) < ? ORDER BY RANDOM()'
|
||||
now = time.time()
|
||||
params = (0, config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now)
|
||||
_dbg('fetch_rows: params=(0, %d, %d, %d, %.0f)' % (config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now))
|
||||
rows = db.execute(q, params).fetchall()
|
||||
|
||||
# Build due condition using new schedule formula
|
||||
due_sql, due_params = _build_due_sql()
|
||||
q = '''SELECT ip,port,proto,failed,success_count,total_duration,country,mitm,
|
||||
consecutive_success,asn,proxy FROM proxylist WHERE %s ORDER BY RANDOM()''' % due_sql
|
||||
_dbg('fetch_rows: working=%d fail_interval=%d backoff=%s max_fail=%d' % (
|
||||
config.watchd.working_checktime, config.watchd.fail_retry_interval,
|
||||
config.watchd.fail_retry_backoff, config.watchd.max_fail))
|
||||
rows = db.execute(q, due_params).fetchall()
|
||||
_dbg('fetch_rows: got %d rows, threads=%d' % (len(rows), config.watchd.threads))
|
||||
# check oldies ?
|
||||
|
||||
# Check oldies? (dead proxies getting a second chance)
|
||||
if len(rows) < config.watchd.threads:
|
||||
_dbg('fetch_rows: rows < threads, clearing rows (oldies=%s)' % config.watchd.oldies)
|
||||
rows = []
|
||||
if config.watchd.oldies:
|
||||
self.isoldies = True
|
||||
## disable tor safeguard for old proxies
|
||||
if self.tor_safeguard: self.tor_safeguard = False
|
||||
rows = db.execute(q, (config.watchd.max_fail, config.watchd.max_fail + round( config.watchd.max_fail/2), config.watchd.checktime, config.watchd.oldies_checktime, time.time())).fetchall()
|
||||
# Disable tor safeguard for old proxies
|
||||
if self.tor_safeguard:
|
||||
self.tor_safeguard = False
|
||||
# Oldies use simple checktime-based query (dead proxies)
|
||||
now = time.time()
|
||||
oldies_max = config.watchd.max_fail + round(config.watchd.max_fail / 2)
|
||||
q_oldies = '''SELECT ip,port,proto,failed,success_count,total_duration,country,
|
||||
mitm,consecutive_success,asn,proxy FROM proxylist
|
||||
WHERE failed >= ? AND failed < ? AND (tested + ?) < ?
|
||||
ORDER BY RANDOM()'''
|
||||
rows = db.execute(q_oldies, (config.watchd.max_fail, oldies_max,
|
||||
config.watchd.oldies_checktime, now)).fetchall()
|
||||
return rows
|
||||
|
||||
def prepare_jobs(self):
|
||||
@@ -1431,18 +1656,16 @@ class Proxywatchd():
|
||||
|
||||
def _run(self):
|
||||
_log('starting...', 'watchd')
|
||||
_log('config: db=%s checktype=%s threads=%d checktime=%d perfail=%d max_fail=%d' % (
|
||||
_log('config: db=%s checktype=%s threads=%d working=%ds fail_interval=%ds backoff=%s max_fail=%d' % (
|
||||
config.watchd.database, ','.join(config.watchd.checktypes), config.watchd.threads,
|
||||
config.watchd.checktime, config.watchd.perfail_checktime, config.watchd.max_fail), 'watchd')
|
||||
config.watchd.working_checktime, config.watchd.fail_retry_interval,
|
||||
config.watchd.fail_retry_backoff, config.watchd.max_fail), 'watchd')
|
||||
|
||||
# Log database status at startup
|
||||
with self._db_context() as db:
|
||||
total = db.execute('SELECT COUNT(*) FROM proxylist').fetchone()[0]
|
||||
now = time.time()
|
||||
due = db.execute(
|
||||
'SELECT COUNT(*) FROM proxylist WHERE failed >= 0 and failed < ? and (tested + ? + (failed * ?)) < ?',
|
||||
(config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now)
|
||||
).fetchone()[0]
|
||||
due_sql, due_params = _build_due_sql()
|
||||
due = db.execute('SELECT COUNT(*) FROM proxylist WHERE ' + due_sql, due_params).fetchone()[0]
|
||||
|
||||
# Create stats persistence tables
|
||||
dbs.create_table_if_not_exists(db, 'stats_history')
|
||||
@@ -1464,7 +1687,14 @@ class Proxywatchd():
|
||||
|
||||
# Start HTTP API server if enabled
|
||||
if config.httpd.enabled:
|
||||
from httpd import ProxyAPIServer
|
||||
from httpd import ProxyAPIServer, configure_schedule
|
||||
# Pass schedule config to httpd module
|
||||
configure_schedule(
|
||||
config.watchd.working_checktime,
|
||||
config.watchd.fail_retry_interval,
|
||||
config.watchd.fail_retry_backoff,
|
||||
config.watchd.max_fail
|
||||
)
|
||||
self.httpd_server = ProxyAPIServer(
|
||||
config.httpd.listenip,
|
||||
config.httpd.port,
|
||||
@@ -1473,6 +1703,16 @@ class Proxywatchd():
|
||||
)
|
||||
self.httpd_server.start()
|
||||
|
||||
# Start verification thread if enabled (manager-only, checks disputed results)
|
||||
if config.verification.enabled and config.watchd.threads > 0:
|
||||
self.verification_thread = VerificationThread(
|
||||
database=config.watchd.database,
|
||||
interval=config.verification.interval,
|
||||
batch_size=config.verification.batch_size
|
||||
)
|
||||
self.verification_thread.start()
|
||||
_log('verification thread enabled', 'watchd')
|
||||
|
||||
# create worker threads with shared queues
|
||||
for i in range(config.watchd.threads):
|
||||
threadid = ''.join([random.choice(string.ascii_letters) for x in range(5)])
|
||||
|
||||
Reference in New Issue
Block a user