add latency tracking and dynamic thread scaling
- dbs.py: add avg_latency, latency_samples columns with migration - dbs.py: update_proxy_latency() with exponential moving average - proxywatchd.py: ThreadScaler class for dynamic thread count - proxywatchd.py: calculate/record latency for successful proxies - proxywatchd.py: _spawn_thread(), _remove_thread(), _adjust_threads() - scaler reports status alongside periodic stats
This commit is contained in:
166
proxywatchd.py
166
proxywatchd.py
@@ -22,6 +22,7 @@ except (ImportError, IOError):
|
||||
from config import Config
|
||||
|
||||
import mysqlite
|
||||
import dbs
|
||||
from misc import _log, categorize_error
|
||||
import rocksock
|
||||
import connection_pool
|
||||
@@ -201,6 +202,75 @@ def calculate_priority(failcount, success_count, max_fail):
|
||||
# High fail count
|
||||
return 4
|
||||
|
||||
|
||||
class ThreadScaler(object):
|
||||
"""Dynamic thread scaling based on queue depth and success rate.
|
||||
|
||||
Scales up when:
|
||||
- Queue depth exceeds high watermark
|
||||
- Success rate is above threshold
|
||||
Scales down when:
|
||||
- Queue is nearly empty
|
||||
- Success rate drops below threshold
|
||||
"""
|
||||
|
||||
def __init__(self, min_threads=2, max_threads=50, target_queue_per_thread=10):
|
||||
self.min_threads = min_threads
|
||||
self.max_threads = max_threads
|
||||
self.target_queue_per_thread = target_queue_per_thread
|
||||
self.last_scale_time = 0
|
||||
self.scale_cooldown = 30 # seconds between scaling decisions
|
||||
self.success_threshold = 10.0 # minimum success rate % to scale up
|
||||
|
||||
def should_scale(self, current_threads, queue_size, stats):
|
||||
"""Determine if scaling is needed.
|
||||
|
||||
Args:
|
||||
current_threads: Current number of active threads
|
||||
queue_size: Current job queue depth
|
||||
stats: Stats object with success/fail counts
|
||||
|
||||
Returns:
|
||||
int: Target thread count (may be same as current)
|
||||
"""
|
||||
now = time.time()
|
||||
if (now - self.last_scale_time) < self.scale_cooldown:
|
||||
return current_threads
|
||||
|
||||
# Calculate current success rate
|
||||
with stats.lock:
|
||||
total = stats.tested
|
||||
passed = stats.passed
|
||||
success_rate = try_div(passed * 100.0, total) if total > 0 else 50.0
|
||||
|
||||
# Calculate ideal thread count based on queue depth
|
||||
ideal = max(self.min_threads, queue_size // self.target_queue_per_thread)
|
||||
ideal = min(ideal, self.max_threads)
|
||||
|
||||
target = current_threads
|
||||
|
||||
# Scale up: queue is deep and success rate is acceptable
|
||||
if queue_size > current_threads * self.target_queue_per_thread * 2:
|
||||
if success_rate >= self.success_threshold:
|
||||
target = min(current_threads + 2, ideal, self.max_threads)
|
||||
|
||||
# Scale down: queue is shallow or success rate is poor
|
||||
elif queue_size < current_threads * 2:
|
||||
target = max(current_threads - 1, self.min_threads)
|
||||
elif success_rate < self.success_threshold / 2:
|
||||
# Drastic success rate drop - scale down to reduce load
|
||||
target = max(current_threads - 2, self.min_threads)
|
||||
|
||||
if target != current_threads:
|
||||
self.last_scale_time = now
|
||||
|
||||
return target
|
||||
|
||||
def status_line(self, current_threads, queue_size):
|
||||
"""Return status string for logging."""
|
||||
return 'threads=%d queue=%d target_per_thread=%d' % (
|
||||
current_threads, queue_size, self.target_queue_per_thread)
|
||||
|
||||
def socks4_resolve(srvname, server_port):
|
||||
srv = srvname
|
||||
if srv in cached_dns:
|
||||
@@ -251,6 +321,7 @@ class ProxyTestState():
|
||||
self.lock = threading.Lock()
|
||||
self.results = [] # list of (success, proto, duration, srv, tor, ssl)
|
||||
self.completed = False
|
||||
self.last_latency_ms = None # average latency from successful tests
|
||||
|
||||
def record_result(self, success, proto=None, duration=0, srv=None, tor=None, ssl=None, category=None):
|
||||
"""Record a single target test result. Thread-safe."""
|
||||
@@ -323,6 +394,11 @@ class ProxyTestState():
|
||||
self.success_count += 1
|
||||
self.total_duration += int(last_good['duration'] * 1000)
|
||||
|
||||
# Calculate average latency from successful tests (in ms)
|
||||
durations = [s['duration'] for s in successes if s['duration']]
|
||||
if durations:
|
||||
self.last_latency_ms = sum(durations) * 1000.0 / len(durations)
|
||||
|
||||
torstats = "" if len(config.torhosts) == 1 else ' tor: %s;' % last_good['tor']
|
||||
_log('%s://%s:%d (%s) d: %.2f sec(s);%s srv: %s; ssl: %s; %d/%d targets' % (
|
||||
last_good['proto'], self.ip, self.port, self.country,
|
||||
@@ -567,6 +643,52 @@ class Proxywatchd():
|
||||
self.last_cleanup = time.time()
|
||||
self.httpd_server = None
|
||||
|
||||
# Dynamic thread scaling
|
||||
self.scaler = ThreadScaler(
|
||||
min_threads=max(2, config.watchd.threads // 4),
|
||||
max_threads=config.watchd.threads * 2,
|
||||
target_queue_per_thread=10
|
||||
)
|
||||
self.thread_id_counter = 0
|
||||
|
||||
def _spawn_thread(self):
|
||||
"""Spawn a new worker thread."""
|
||||
self.thread_id_counter += 1
|
||||
threadid = 'dyn%d' % self.thread_id_counter
|
||||
wt = WorkerThread(threadid, self.job_queue, self.result_queue)
|
||||
wt.start_thread()
|
||||
self.threads.append(wt)
|
||||
return threadid
|
||||
|
||||
def _remove_thread(self):
|
||||
"""Remove one worker thread (graceful shutdown)."""
|
||||
if len(self.threads) <= self.scaler.min_threads:
|
||||
return None
|
||||
# Stop the last thread
|
||||
wt = self.threads.pop()
|
||||
wt.stop()
|
||||
# Don't wait for it - let it finish its current job
|
||||
return wt.id
|
||||
|
||||
def _adjust_threads(self, target):
|
||||
"""Adjust thread count to target."""
|
||||
current = len(self.threads)
|
||||
if target > current:
|
||||
added = []
|
||||
for _ in range(target - current):
|
||||
tid = self._spawn_thread()
|
||||
added.append(tid)
|
||||
if added:
|
||||
_log('scaled up: +%d threads (%s)' % (len(added), ','.join(added)), 'scaler')
|
||||
elif target < current:
|
||||
removed = []
|
||||
for _ in range(current - target):
|
||||
tid = self._remove_thread()
|
||||
if tid:
|
||||
removed.append(tid)
|
||||
if removed:
|
||||
_log('scaled down: -%d threads' % len(removed), 'scaler')
|
||||
|
||||
def fetch_rows(self):
|
||||
self.isoldies = False
|
||||
q = 'SELECT ip,port,proto,failed,success_count,total_duration,country,mitm,consecutive_success FROM proxylist WHERE failed >= ? and failed < ? and (tested + ? + (failed * ?)) < ? ORDER BY RANDOM()'
|
||||
@@ -676,28 +798,46 @@ class Proxywatchd():
|
||||
# they'll be re-tested in the next cycle
|
||||
|
||||
def submit_collected(self):
|
||||
if len(self.collected) == 0: return True
|
||||
if len(self.collected) == 0:
|
||||
return True
|
||||
sc = 0
|
||||
args = []
|
||||
latency_updates = [] # (proxy, latency_ms) for successful tests
|
||||
for job in self.collected:
|
||||
if job.failcount == 0: sc += 1
|
||||
args.append( (job.failcount, job.checktime, 1, job.country, job.proto, job.success_count, job.total_duration, job.mitm, job.consecutive_success, job.proxy) )
|
||||
if job.failcount == 0:
|
||||
sc += 1
|
||||
if job.last_latency_ms is not None:
|
||||
latency_updates.append((job.proxy, job.last_latency_ms))
|
||||
args.append((job.failcount, job.checktime, 1, job.country, job.proto,
|
||||
job.success_count, job.total_duration, job.mitm,
|
||||
job.consecutive_success, job.proxy))
|
||||
|
||||
success_rate = (float(sc) / len(self.collected)) * 100
|
||||
ret = True
|
||||
if len(self.collected) >= 100 and success_rate <= config.watchd.outage_threshold and self.tor_safeguard:
|
||||
_log("WATCHD %.2f%% SUCCESS RATE - tor circuit blocked? won't submit fails"%success_rate, "ERROR")
|
||||
if sc == 0: return False
|
||||
_log("WATCHD %.2f%% SUCCESS RATE - tor circuit blocked? won't submit fails" % success_rate, "ERROR")
|
||||
if sc == 0:
|
||||
return False
|
||||
args = []
|
||||
latency_updates = []
|
||||
for job in self.collected:
|
||||
if job.failcount == 0:
|
||||
args.append( (job.failcount, job.checktime, 1, job.country, job.proto, job.success_count, job.total_duration, job.mitm, job.consecutive_success, job.proxy) )
|
||||
args.append((job.failcount, job.checktime, 1, job.country, job.proto,
|
||||
job.success_count, job.total_duration, job.mitm,
|
||||
job.consecutive_success, job.proxy))
|
||||
if job.last_latency_ms is not None:
|
||||
latency_updates.append((job.proxy, job.last_latency_ms))
|
||||
ret = False
|
||||
|
||||
_log("updating %d DB entries (success rate: %.2f%%)"%(len(self.collected), success_rate), 'watchd')
|
||||
_log("updating %d DB entries (success rate: %.2f%%)" % (len(self.collected), success_rate), 'watchd')
|
||||
self._prep_db()
|
||||
query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=? WHERE proxy=?'
|
||||
self.mysqlite.executemany(query, args)
|
||||
|
||||
# Update latency metrics for successful proxies
|
||||
for proxy, latency_ms in latency_updates:
|
||||
dbs.update_proxy_latency(self.mysqlite, proxy, latency_ms)
|
||||
|
||||
self.mysqlite.commit()
|
||||
self._close_db()
|
||||
self.collected = []
|
||||
@@ -803,6 +943,18 @@ class Proxywatchd():
|
||||
pool = connection_pool.get_pool()
|
||||
if pool:
|
||||
_log(pool.status_line(), 'stats')
|
||||
# Report scaler status
|
||||
_log(self.scaler.status_line(len(self.threads), self.job_queue.qsize()), 'scaler')
|
||||
|
||||
# Dynamic thread scaling
|
||||
if self.in_background:
|
||||
target = self.scaler.should_scale(
|
||||
len(self.threads),
|
||||
self.job_queue.qsize(),
|
||||
self.stats
|
||||
)
|
||||
if target != len(self.threads):
|
||||
self._adjust_threads(target)
|
||||
|
||||
# periodic stale proxy cleanup (daily)
|
||||
if (time.time() - self.last_cleanup) >= 86400:
|
||||
|
||||
Reference in New Issue
Block a user