diff --git a/proxywatchd.py b/proxywatchd.py index e8ad806..47a464f 100644 --- a/proxywatchd.py +++ b/proxywatchd.py @@ -1,4 +1,5 @@ #!/usr/bin/env python2 +from __future__ import division # Gevent monkey-patching MUST happen before any other imports # This patches threading, socket, time, etc. for cooperative concurrency @@ -563,9 +564,7 @@ class Stats(): self.proto_tested['socks5'] = state.get('proto_socks5_tested', 0) self.proto_passed['socks5'] = state.get('proto_socks5_passed', 0) self.peak_rate = state.get('peak_rate', 0.0) - # Restore start_time to preserve uptime calculation - if state.get('start_time'): - self.start_time = state['start_time'] + # Note: start_time is NOT restored - uptime reflects current session # Restore failure categories if state.get('fail_categories'): self.fail_categories = dict(state['fail_categories']) @@ -767,18 +766,14 @@ class ThreadScaler(object): target = current_threads - # Scale up: queue is deep and success rate is acceptable + # Scale up: queue is deep - scale based on queue depth only # With greenlets, scale more aggressively (+5 instead of +2) if queue_size > current_threads * self.target_queue_per_thread: - if success_rate >= self.success_threshold: - target = min(current_threads + 5, ideal, self.max_threads) + target = min(current_threads + 5, ideal, self.max_threads) - # Scale down: queue is shallow or success rate is poor + # Scale down: queue is shallow elif queue_size < current_threads * 2: target = max(current_threads - 2, self.min_threads) - elif success_rate < self.success_threshold / 2: - # Drastic success rate drop - scale down to reduce load - target = max(current_threads - 5, self.min_threads) if target != current_threads: self.last_scale_time = now @@ -821,7 +816,8 @@ class ProxyTestState(): When all tests complete, evaluate() determines final pass/fail. """ def __init__(self, ip, port, proto, failcount, success_count, total_duration, - country, mitm, consecutive_success, asn=None, num_targets=3, oldies=False): + country, mitm, consecutive_success, asn=None, num_targets=3, oldies=False, + completion_queue=None): self.ip = ip self.port = int(port) self.proxy = '%s:%s' % (ip, port) @@ -836,6 +832,7 @@ class ProxyTestState(): self.asn = asn self.isoldies = oldies self.num_targets = num_targets + self.completion_queue = completion_queue # for signaling completion # thread-safe result accumulation self.lock = threading.Lock() @@ -852,7 +849,11 @@ class ProxyTestState(): self.cert_error = False def record_result(self, success, proto=None, duration=0, srv=None, tor=None, ssl=None, category=None, exit_ip=None, reveals_headers=None): - """Record a single target test result. Thread-safe.""" + """Record a single target test result. Thread-safe. + + When all target tests complete, signals via completion_queue. + """ + should_signal = False with self.lock: self.results.append({ 'success': success, @@ -873,9 +874,20 @@ class ProxyTestState(): # Track cert errors if category == 'cert_error' or category == 'ssl_error': self.cert_error = True + # Check completion (inside lock to prevent race) + if not self.completed and len(self.results) >= self.num_targets: + self.completed = True + should_signal = True + # Signal outside lock to avoid deadlock + if should_signal and self.completion_queue is not None: + self.completion_queue.put(self) def is_complete(self): """Check if all target tests have finished.""" + # Fast path: check completed flag without lock (atomic read) + if self.completed: + return True + # Slow path: check with lock (only during transition) with self.lock: return len(self.results) >= self.num_targets @@ -1277,9 +1289,10 @@ class Proxywatchd(): # shared work-stealing queues self.job_queue = PriorityJobQueue() self.result_queue = Queue.Queue() + self.completion_queue = Queue.Queue() # completed ProxyTestState objects # track pending proxy states (for multi-target aggregation) - self.pending_states = [] # list of ProxyTestState awaiting completion + self.pending_states = {} # dict: proxy -> ProxyTestState (O(1) lookup/removal) self.pending_lock = threading.Lock() # create table if needed (use dbs.py for canonical schema) @@ -1297,12 +1310,12 @@ class Proxywatchd(): self.last_cleanup = time.time() self.httpd_server = None - # Dynamic thread scaling (with gevent, greenlets are cheap - use higher limits) - min_t = config.watchd.min_threads if config.watchd.min_threads > 0 else max(5, config.watchd.threads // 2) + # Dynamic thread scaling + min_t = config.watchd.min_threads if config.watchd.min_threads > 0 else max(5, config.watchd.threads // 4) self.scaler = ThreadScaler( min_threads=min_t, - max_threads=config.watchd.threads * 10, # greenlets allow much higher concurrency - target_queue_per_thread=5 # lower threshold for faster scaling + max_threads=config.watchd.threads, # respect configured thread limit + target_queue_per_thread=5 ) self.thread_id_counter = 0 self.last_jobs_log = 0 @@ -1346,9 +1359,9 @@ class Proxywatchd(): # Update runtime values self.submit_after = config.watchd.submit_after - # Update scaler limits (greenlets allow higher concurrency) - self.scaler.min_threads = max(5, config.watchd.threads // 2) - self.scaler.max_threads = config.watchd.threads * 10 + # Update scaler limits + self.scaler.min_threads = max(5, config.watchd.threads // 4) + self.scaler.max_threads = config.watchd.threads # Warn about values requiring restart if config.watchd.checktype != old_checktype: @@ -1454,7 +1467,7 @@ class Proxywatchd(): state = ProxyTestState( row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], asn=row[9], num_targets=num_targets, - oldies=self.isoldies + oldies=self.isoldies, completion_queue=self.completion_queue ) new_states.append(state) @@ -1469,9 +1482,10 @@ class Proxywatchd(): # shuffle to interleave tests across different proxies random.shuffle(all_jobs) - # track pending states + # track pending states (dict for O(1) lookup/removal) with self.pending_lock: - self.pending_states.extend(new_states) + for state in new_states: + self.pending_states[state.proxy] = state # queue all jobs with priority for job in all_jobs: @@ -1501,20 +1515,25 @@ class Proxywatchd(): except Queue.Empty: break - # check for completed proxy states and evaluate them - with self.pending_lock: - still_pending = [] - for state in self.pending_states: - if state.is_complete(): - success, category = state.evaluate() - self.stats.record(success, category, state.proto, state.last_latency_ms, - state.country, state.asn, - ssl_test=state.had_ssl_test, mitm=(state.mitm > 0), - cert_error=state.cert_error) - self.collected.append(state) - else: - still_pending.append(state) - self.pending_states = still_pending + # process completed states from completion queue (event-driven, not polling) + # ProxyTestState.record_result() pushes to completion_queue when all targets done + completed_count = 0 + while True: + try: + state = self.completion_queue.get_nowait() + completed_count += 1 + # evaluate and record stats + success, category = state.evaluate() + self.stats.record(success, category, state.proto, state.last_latency_ms, + state.country, state.asn, + ssl_test=state.had_ssl_test, mitm=(state.mitm > 0), + cert_error=state.cert_error) + self.collected.append(state) + # remove from pending dict (O(1)) + with self.pending_lock: + self.pending_states.pop(state.proxy, None) + except Queue.Empty: + break def collect_unfinished(self): # drain any remaining jobs from job queue @@ -1584,14 +1603,16 @@ class Proxywatchd(): query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=?,asn=? WHERE proxy=?' self.mysqlite.executemany(query, args) - # Update latency metrics for successful proxies - for proxy, latency_ms in latency_updates: - dbs.update_proxy_latency(self.mysqlite, proxy, latency_ms) + # Batch update latency metrics for successful proxies + if latency_updates: + dbs.batch_update_proxy_latency(self.mysqlite, latency_updates) - # Update anonymity for proxies with exit IP data - for job in self.collected: - if job.failcount == 0 and job.exit_ip: - dbs.update_proxy_anonymity(self.mysqlite, job.proxy, job.exit_ip, job.ip, job.reveals_headers) + # Batch update anonymity for proxies with exit IP data + anonymity_updates = [(job.proxy, job.exit_ip, job.ip, job.reveals_headers) + for job in self.collected + if job.failcount == 0 and job.exit_ip] + if anonymity_updates: + dbs.batch_update_proxy_anonymity(self.mysqlite, anonymity_updates) self.mysqlite.commit() self._close_db() @@ -1696,6 +1717,7 @@ class Proxywatchd(): stats_data['max_threads'] = self.scaler.max_threads stats_data['queue_size'] = self.job_queue.qsize() stats_data['checktype'] = config.watchd.checktype + stats_data['profiling'] = getattr(config.args, 'profile', False) if hasattr(config, 'args') else False stats_data['pass_rate'] = try_div(self.stats.passed, elapsed) # Tor pool stats