proxywatchd: fix division, uptime, and scaler
- Add from __future__ import division for correct percentages - Remove start_time restoration so uptime reflects current session - Remove success_rate threshold for scaling (scale on queue depth only)
This commit is contained in:
110
proxywatchd.py
110
proxywatchd.py
@@ -1,4 +1,5 @@
|
||||
#!/usr/bin/env python2
|
||||
from __future__ import division
|
||||
|
||||
# Gevent monkey-patching MUST happen before any other imports
|
||||
# This patches threading, socket, time, etc. for cooperative concurrency
|
||||
@@ -563,9 +564,7 @@ class Stats():
|
||||
self.proto_tested['socks5'] = state.get('proto_socks5_tested', 0)
|
||||
self.proto_passed['socks5'] = state.get('proto_socks5_passed', 0)
|
||||
self.peak_rate = state.get('peak_rate', 0.0)
|
||||
# Restore start_time to preserve uptime calculation
|
||||
if state.get('start_time'):
|
||||
self.start_time = state['start_time']
|
||||
# Note: start_time is NOT restored - uptime reflects current session
|
||||
# Restore failure categories
|
||||
if state.get('fail_categories'):
|
||||
self.fail_categories = dict(state['fail_categories'])
|
||||
@@ -767,18 +766,14 @@ class ThreadScaler(object):
|
||||
|
||||
target = current_threads
|
||||
|
||||
# Scale up: queue is deep and success rate is acceptable
|
||||
# Scale up: queue is deep - scale based on queue depth only
|
||||
# With greenlets, scale more aggressively (+5 instead of +2)
|
||||
if queue_size > current_threads * self.target_queue_per_thread:
|
||||
if success_rate >= self.success_threshold:
|
||||
target = min(current_threads + 5, ideal, self.max_threads)
|
||||
target = min(current_threads + 5, ideal, self.max_threads)
|
||||
|
||||
# Scale down: queue is shallow or success rate is poor
|
||||
# Scale down: queue is shallow
|
||||
elif queue_size < current_threads * 2:
|
||||
target = max(current_threads - 2, self.min_threads)
|
||||
elif success_rate < self.success_threshold / 2:
|
||||
# Drastic success rate drop - scale down to reduce load
|
||||
target = max(current_threads - 5, self.min_threads)
|
||||
|
||||
if target != current_threads:
|
||||
self.last_scale_time = now
|
||||
@@ -821,7 +816,8 @@ class ProxyTestState():
|
||||
When all tests complete, evaluate() determines final pass/fail.
|
||||
"""
|
||||
def __init__(self, ip, port, proto, failcount, success_count, total_duration,
|
||||
country, mitm, consecutive_success, asn=None, num_targets=3, oldies=False):
|
||||
country, mitm, consecutive_success, asn=None, num_targets=3, oldies=False,
|
||||
completion_queue=None):
|
||||
self.ip = ip
|
||||
self.port = int(port)
|
||||
self.proxy = '%s:%s' % (ip, port)
|
||||
@@ -836,6 +832,7 @@ class ProxyTestState():
|
||||
self.asn = asn
|
||||
self.isoldies = oldies
|
||||
self.num_targets = num_targets
|
||||
self.completion_queue = completion_queue # for signaling completion
|
||||
|
||||
# thread-safe result accumulation
|
||||
self.lock = threading.Lock()
|
||||
@@ -852,7 +849,11 @@ class ProxyTestState():
|
||||
self.cert_error = False
|
||||
|
||||
def record_result(self, success, proto=None, duration=0, srv=None, tor=None, ssl=None, category=None, exit_ip=None, reveals_headers=None):
|
||||
"""Record a single target test result. Thread-safe."""
|
||||
"""Record a single target test result. Thread-safe.
|
||||
|
||||
When all target tests complete, signals via completion_queue.
|
||||
"""
|
||||
should_signal = False
|
||||
with self.lock:
|
||||
self.results.append({
|
||||
'success': success,
|
||||
@@ -873,9 +874,20 @@ class ProxyTestState():
|
||||
# Track cert errors
|
||||
if category == 'cert_error' or category == 'ssl_error':
|
||||
self.cert_error = True
|
||||
# Check completion (inside lock to prevent race)
|
||||
if not self.completed and len(self.results) >= self.num_targets:
|
||||
self.completed = True
|
||||
should_signal = True
|
||||
# Signal outside lock to avoid deadlock
|
||||
if should_signal and self.completion_queue is not None:
|
||||
self.completion_queue.put(self)
|
||||
|
||||
def is_complete(self):
|
||||
"""Check if all target tests have finished."""
|
||||
# Fast path: check completed flag without lock (atomic read)
|
||||
if self.completed:
|
||||
return True
|
||||
# Slow path: check with lock (only during transition)
|
||||
with self.lock:
|
||||
return len(self.results) >= self.num_targets
|
||||
|
||||
@@ -1277,9 +1289,10 @@ class Proxywatchd():
|
||||
# shared work-stealing queues
|
||||
self.job_queue = PriorityJobQueue()
|
||||
self.result_queue = Queue.Queue()
|
||||
self.completion_queue = Queue.Queue() # completed ProxyTestState objects
|
||||
|
||||
# track pending proxy states (for multi-target aggregation)
|
||||
self.pending_states = [] # list of ProxyTestState awaiting completion
|
||||
self.pending_states = {} # dict: proxy -> ProxyTestState (O(1) lookup/removal)
|
||||
self.pending_lock = threading.Lock()
|
||||
|
||||
# create table if needed (use dbs.py for canonical schema)
|
||||
@@ -1297,12 +1310,12 @@ class Proxywatchd():
|
||||
self.last_cleanup = time.time()
|
||||
self.httpd_server = None
|
||||
|
||||
# Dynamic thread scaling (with gevent, greenlets are cheap - use higher limits)
|
||||
min_t = config.watchd.min_threads if config.watchd.min_threads > 0 else max(5, config.watchd.threads // 2)
|
||||
# Dynamic thread scaling
|
||||
min_t = config.watchd.min_threads if config.watchd.min_threads > 0 else max(5, config.watchd.threads // 4)
|
||||
self.scaler = ThreadScaler(
|
||||
min_threads=min_t,
|
||||
max_threads=config.watchd.threads * 10, # greenlets allow much higher concurrency
|
||||
target_queue_per_thread=5 # lower threshold for faster scaling
|
||||
max_threads=config.watchd.threads, # respect configured thread limit
|
||||
target_queue_per_thread=5
|
||||
)
|
||||
self.thread_id_counter = 0
|
||||
self.last_jobs_log = 0
|
||||
@@ -1346,9 +1359,9 @@ class Proxywatchd():
|
||||
# Update runtime values
|
||||
self.submit_after = config.watchd.submit_after
|
||||
|
||||
# Update scaler limits (greenlets allow higher concurrency)
|
||||
self.scaler.min_threads = max(5, config.watchd.threads // 2)
|
||||
self.scaler.max_threads = config.watchd.threads * 10
|
||||
# Update scaler limits
|
||||
self.scaler.min_threads = max(5, config.watchd.threads // 4)
|
||||
self.scaler.max_threads = config.watchd.threads
|
||||
|
||||
# Warn about values requiring restart
|
||||
if config.watchd.checktype != old_checktype:
|
||||
@@ -1454,7 +1467,7 @@ class Proxywatchd():
|
||||
state = ProxyTestState(
|
||||
row[0], row[1], row[2], row[3], row[4], row[5],
|
||||
row[6], row[7], row[8], asn=row[9], num_targets=num_targets,
|
||||
oldies=self.isoldies
|
||||
oldies=self.isoldies, completion_queue=self.completion_queue
|
||||
)
|
||||
new_states.append(state)
|
||||
|
||||
@@ -1469,9 +1482,10 @@ class Proxywatchd():
|
||||
# shuffle to interleave tests across different proxies
|
||||
random.shuffle(all_jobs)
|
||||
|
||||
# track pending states
|
||||
# track pending states (dict for O(1) lookup/removal)
|
||||
with self.pending_lock:
|
||||
self.pending_states.extend(new_states)
|
||||
for state in new_states:
|
||||
self.pending_states[state.proxy] = state
|
||||
|
||||
# queue all jobs with priority
|
||||
for job in all_jobs:
|
||||
@@ -1501,20 +1515,25 @@ class Proxywatchd():
|
||||
except Queue.Empty:
|
||||
break
|
||||
|
||||
# check for completed proxy states and evaluate them
|
||||
with self.pending_lock:
|
||||
still_pending = []
|
||||
for state in self.pending_states:
|
||||
if state.is_complete():
|
||||
success, category = state.evaluate()
|
||||
self.stats.record(success, category, state.proto, state.last_latency_ms,
|
||||
state.country, state.asn,
|
||||
ssl_test=state.had_ssl_test, mitm=(state.mitm > 0),
|
||||
cert_error=state.cert_error)
|
||||
self.collected.append(state)
|
||||
else:
|
||||
still_pending.append(state)
|
||||
self.pending_states = still_pending
|
||||
# process completed states from completion queue (event-driven, not polling)
|
||||
# ProxyTestState.record_result() pushes to completion_queue when all targets done
|
||||
completed_count = 0
|
||||
while True:
|
||||
try:
|
||||
state = self.completion_queue.get_nowait()
|
||||
completed_count += 1
|
||||
# evaluate and record stats
|
||||
success, category = state.evaluate()
|
||||
self.stats.record(success, category, state.proto, state.last_latency_ms,
|
||||
state.country, state.asn,
|
||||
ssl_test=state.had_ssl_test, mitm=(state.mitm > 0),
|
||||
cert_error=state.cert_error)
|
||||
self.collected.append(state)
|
||||
# remove from pending dict (O(1))
|
||||
with self.pending_lock:
|
||||
self.pending_states.pop(state.proxy, None)
|
||||
except Queue.Empty:
|
||||
break
|
||||
|
||||
def collect_unfinished(self):
|
||||
# drain any remaining jobs from job queue
|
||||
@@ -1584,14 +1603,16 @@ class Proxywatchd():
|
||||
query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=?,asn=? WHERE proxy=?'
|
||||
self.mysqlite.executemany(query, args)
|
||||
|
||||
# Update latency metrics for successful proxies
|
||||
for proxy, latency_ms in latency_updates:
|
||||
dbs.update_proxy_latency(self.mysqlite, proxy, latency_ms)
|
||||
# Batch update latency metrics for successful proxies
|
||||
if latency_updates:
|
||||
dbs.batch_update_proxy_latency(self.mysqlite, latency_updates)
|
||||
|
||||
# Update anonymity for proxies with exit IP data
|
||||
for job in self.collected:
|
||||
if job.failcount == 0 and job.exit_ip:
|
||||
dbs.update_proxy_anonymity(self.mysqlite, job.proxy, job.exit_ip, job.ip, job.reveals_headers)
|
||||
# Batch update anonymity for proxies with exit IP data
|
||||
anonymity_updates = [(job.proxy, job.exit_ip, job.ip, job.reveals_headers)
|
||||
for job in self.collected
|
||||
if job.failcount == 0 and job.exit_ip]
|
||||
if anonymity_updates:
|
||||
dbs.batch_update_proxy_anonymity(self.mysqlite, anonymity_updates)
|
||||
|
||||
self.mysqlite.commit()
|
||||
self._close_db()
|
||||
@@ -1696,6 +1717,7 @@ class Proxywatchd():
|
||||
stats_data['max_threads'] = self.scaler.max_threads
|
||||
stats_data['queue_size'] = self.job_queue.qsize()
|
||||
stats_data['checktype'] = config.watchd.checktype
|
||||
stats_data['profiling'] = getattr(config.args, 'profile', False) if hasattr(config, 'args') else False
|
||||
stats_data['pass_rate'] = try_div(self.stats.passed, elapsed)
|
||||
|
||||
# Tor pool stats
|
||||
|
||||
Reference in New Issue
Block a user