diff --git a/TODO.md b/TODO.md index cdb089b..d71f55b 100644 --- a/TODO.md +++ b/TODO.md @@ -108,82 +108,28 @@ and report() methods. Integrated into main loop with configurable stats_interval --- -### [ ] 12. Dynamic Thread Scaling +### [x] 12. Dynamic Thread Scaling -**Problem:** Fixed thread count regardless of success rate or system load. - -**Implementation:** -```python -# proxywatchd.py -class ThreadScaler: - """Dynamically adjust thread count based on performance.""" - - def __init__(self, min_threads=5, max_threads=50): - self.min = min_threads - self.max = max_threads - self.current = min_threads - self.success_rate_window = [] - - def record_result(self, success): - self.success_rate_window.append(success) - if len(self.success_rate_window) > 100: - self.success_rate_window.pop(0) - - def recommended_threads(self): - if len(self.success_rate_window) < 20: - return self.current - - success_rate = sum(self.success_rate_window) / len(self.success_rate_window) - - # High success rate -> can handle more threads - if success_rate > 0.7 and self.current < self.max: - return self.current + 5 - # Low success rate -> reduce load - elif success_rate < 0.3 and self.current > self.min: - return self.current - 5 - - return self.current -``` - -**Files:** proxywatchd.py -**Effort:** Medium -**Risk:** Medium +**Completed.** Added dynamic thread scaling based on queue depth and success rate. +- ThreadScaler class in proxywatchd.py with should_scale(), status_line() +- Scales up when queue is deep (2x target) and success rate > 10% +- Scales down when queue is shallow or success rate drops +- Min/max threads derived from config.watchd.threads (1/4x to 2x) +- 30-second cooldown between scaling decisions +- _spawn_thread(), _remove_thread(), _adjust_threads() helper methods +- Scaler status reported alongside periodic stats --- -### [ ] 13. Latency Tracking +### [x] 13. Latency Tracking -**Problem:** No visibility into proxy speed. A working but slow proxy may be -less useful than a fast one. - -**Implementation:** -```python -# dbs.py - add columns -# ALTER TABLE proxylist ADD COLUMN avg_latency REAL DEFAULT 0 -# ALTER TABLE proxylist ADD COLUMN latency_samples INTEGER DEFAULT 0 - -def update_proxy_latency(proxydb, proxy, latency): - """Update rolling average latency for proxy.""" - row = proxydb.execute( - 'SELECT avg_latency, latency_samples FROM proxylist WHERE proxy=?', - (proxy,) - ).fetchone() - - if row: - old_avg, samples = row - # Exponential moving average - new_avg = (old_avg * samples + latency) / (samples + 1) - new_samples = min(samples + 1, 100) # Cap at 100 samples - - proxydb.execute( - 'UPDATE proxylist SET avg_latency=?, latency_samples=? WHERE proxy=?', - (new_avg, new_samples, proxy) - ) -``` - -**Files:** dbs.py, proxywatchd.py -**Effort:** Medium -**Risk:** Low +**Completed.** Added per-proxy latency tracking with exponential moving average. +- dbs.py: avg_latency, latency_samples columns added to proxylist schema +- dbs.py: _migrate_latency_columns() for backward-compatible migration +- dbs.py: update_proxy_latency() with EMA (alpha = 2/(samples+1)) +- proxywatchd.py: ProxyTestState.last_latency_ms field +- proxywatchd.py: evaluate() calculates average latency from successful tests +- proxywatchd.py: submit_collected() records latency for passing proxies --- diff --git a/dbs.py b/dbs.py index 4e5f489..b401d2a 100644 --- a/dbs.py +++ b/dbs.py @@ -6,6 +6,46 @@ import time from misc import _log +def _migrate_latency_columns(sqlite): + """Add latency columns to existing databases.""" + try: + sqlite.execute('SELECT avg_latency FROM proxylist LIMIT 1') + except Exception: + sqlite.execute('ALTER TABLE proxylist ADD COLUMN avg_latency REAL DEFAULT 0') + sqlite.execute('ALTER TABLE proxylist ADD COLUMN latency_samples INT DEFAULT 0') + sqlite.commit() + + +def update_proxy_latency(sqlite, proxy, latency_ms): + """Update rolling average latency for a proxy. + + Args: + sqlite: Database connection + proxy: Proxy address (ip:port) + latency_ms: Response latency in milliseconds + """ + row = sqlite.execute( + 'SELECT avg_latency, latency_samples FROM proxylist WHERE proxy=?', + (proxy,) + ).fetchone() + + if row: + old_avg, samples = row[0] or 0, row[1] or 0 + # Exponential moving average, capped at 100 samples + new_samples = min(samples + 1, 100) + if samples == 0: + new_avg = latency_ms + else: + # Weight recent samples more heavily + alpha = 2.0 / (new_samples + 1) + new_avg = alpha * latency_ms + (1 - alpha) * old_avg + + sqlite.execute( + 'UPDATE proxylist SET avg_latency=?, latency_samples=? WHERE proxy=?', + (new_avg, new_samples, proxy) + ) + + def create_table_if_not_exists(sqlite, dbname): """Create database table with indexes if it doesn't exist.""" if dbname == 'proxylist': @@ -22,11 +62,15 @@ def create_table_if_not_exists(sqlite, dbname): ip TEXT, port INT, consecutive_success INT, - total_duration INT)""") + total_duration INT, + avg_latency REAL DEFAULT 0, + latency_samples INT DEFAULT 0)""") # Indexes for common query patterns sqlite.execute('CREATE INDEX IF NOT EXISTS idx_proxylist_failed ON proxylist(failed)') sqlite.execute('CREATE INDEX IF NOT EXISTS idx_proxylist_tested ON proxylist(tested)') sqlite.execute('CREATE INDEX IF NOT EXISTS idx_proxylist_proto ON proxylist(proto)') + # Migration: add latency columns if missing + _migrate_latency_columns(sqlite) elif dbname == 'uris': sqlite.execute("""CREATE TABLE IF NOT EXISTS uris ( diff --git a/proxywatchd.py b/proxywatchd.py index b83f0a7..cce44f7 100644 --- a/proxywatchd.py +++ b/proxywatchd.py @@ -22,6 +22,7 @@ except (ImportError, IOError): from config import Config import mysqlite +import dbs from misc import _log, categorize_error import rocksock import connection_pool @@ -201,6 +202,75 @@ def calculate_priority(failcount, success_count, max_fail): # High fail count return 4 + +class ThreadScaler(object): + """Dynamic thread scaling based on queue depth and success rate. + + Scales up when: + - Queue depth exceeds high watermark + - Success rate is above threshold + Scales down when: + - Queue is nearly empty + - Success rate drops below threshold + """ + + def __init__(self, min_threads=2, max_threads=50, target_queue_per_thread=10): + self.min_threads = min_threads + self.max_threads = max_threads + self.target_queue_per_thread = target_queue_per_thread + self.last_scale_time = 0 + self.scale_cooldown = 30 # seconds between scaling decisions + self.success_threshold = 10.0 # minimum success rate % to scale up + + def should_scale(self, current_threads, queue_size, stats): + """Determine if scaling is needed. + + Args: + current_threads: Current number of active threads + queue_size: Current job queue depth + stats: Stats object with success/fail counts + + Returns: + int: Target thread count (may be same as current) + """ + now = time.time() + if (now - self.last_scale_time) < self.scale_cooldown: + return current_threads + + # Calculate current success rate + with stats.lock: + total = stats.tested + passed = stats.passed + success_rate = try_div(passed * 100.0, total) if total > 0 else 50.0 + + # Calculate ideal thread count based on queue depth + ideal = max(self.min_threads, queue_size // self.target_queue_per_thread) + ideal = min(ideal, self.max_threads) + + target = current_threads + + # Scale up: queue is deep and success rate is acceptable + if queue_size > current_threads * self.target_queue_per_thread * 2: + if success_rate >= self.success_threshold: + target = min(current_threads + 2, ideal, self.max_threads) + + # Scale down: queue is shallow or success rate is poor + elif queue_size < current_threads * 2: + target = max(current_threads - 1, self.min_threads) + elif success_rate < self.success_threshold / 2: + # Drastic success rate drop - scale down to reduce load + target = max(current_threads - 2, self.min_threads) + + if target != current_threads: + self.last_scale_time = now + + return target + + def status_line(self, current_threads, queue_size): + """Return status string for logging.""" + return 'threads=%d queue=%d target_per_thread=%d' % ( + current_threads, queue_size, self.target_queue_per_thread) + def socks4_resolve(srvname, server_port): srv = srvname if srv in cached_dns: @@ -251,6 +321,7 @@ class ProxyTestState(): self.lock = threading.Lock() self.results = [] # list of (success, proto, duration, srv, tor, ssl) self.completed = False + self.last_latency_ms = None # average latency from successful tests def record_result(self, success, proto=None, duration=0, srv=None, tor=None, ssl=None, category=None): """Record a single target test result. Thread-safe.""" @@ -323,6 +394,11 @@ class ProxyTestState(): self.success_count += 1 self.total_duration += int(last_good['duration'] * 1000) + # Calculate average latency from successful tests (in ms) + durations = [s['duration'] for s in successes if s['duration']] + if durations: + self.last_latency_ms = sum(durations) * 1000.0 / len(durations) + torstats = "" if len(config.torhosts) == 1 else ' tor: %s;' % last_good['tor'] _log('%s://%s:%d (%s) d: %.2f sec(s);%s srv: %s; ssl: %s; %d/%d targets' % ( last_good['proto'], self.ip, self.port, self.country, @@ -567,6 +643,52 @@ class Proxywatchd(): self.last_cleanup = time.time() self.httpd_server = None + # Dynamic thread scaling + self.scaler = ThreadScaler( + min_threads=max(2, config.watchd.threads // 4), + max_threads=config.watchd.threads * 2, + target_queue_per_thread=10 + ) + self.thread_id_counter = 0 + + def _spawn_thread(self): + """Spawn a new worker thread.""" + self.thread_id_counter += 1 + threadid = 'dyn%d' % self.thread_id_counter + wt = WorkerThread(threadid, self.job_queue, self.result_queue) + wt.start_thread() + self.threads.append(wt) + return threadid + + def _remove_thread(self): + """Remove one worker thread (graceful shutdown).""" + if len(self.threads) <= self.scaler.min_threads: + return None + # Stop the last thread + wt = self.threads.pop() + wt.stop() + # Don't wait for it - let it finish its current job + return wt.id + + def _adjust_threads(self, target): + """Adjust thread count to target.""" + current = len(self.threads) + if target > current: + added = [] + for _ in range(target - current): + tid = self._spawn_thread() + added.append(tid) + if added: + _log('scaled up: +%d threads (%s)' % (len(added), ','.join(added)), 'scaler') + elif target < current: + removed = [] + for _ in range(current - target): + tid = self._remove_thread() + if tid: + removed.append(tid) + if removed: + _log('scaled down: -%d threads' % len(removed), 'scaler') + def fetch_rows(self): self.isoldies = False q = 'SELECT ip,port,proto,failed,success_count,total_duration,country,mitm,consecutive_success FROM proxylist WHERE failed >= ? and failed < ? and (tested + ? + (failed * ?)) < ? ORDER BY RANDOM()' @@ -676,28 +798,46 @@ class Proxywatchd(): # they'll be re-tested in the next cycle def submit_collected(self): - if len(self.collected) == 0: return True + if len(self.collected) == 0: + return True sc = 0 args = [] + latency_updates = [] # (proxy, latency_ms) for successful tests for job in self.collected: - if job.failcount == 0: sc += 1 - args.append( (job.failcount, job.checktime, 1, job.country, job.proto, job.success_count, job.total_duration, job.mitm, job.consecutive_success, job.proxy) ) + if job.failcount == 0: + sc += 1 + if job.last_latency_ms is not None: + latency_updates.append((job.proxy, job.last_latency_ms)) + args.append((job.failcount, job.checktime, 1, job.country, job.proto, + job.success_count, job.total_duration, job.mitm, + job.consecutive_success, job.proxy)) success_rate = (float(sc) / len(self.collected)) * 100 ret = True if len(self.collected) >= 100 and success_rate <= config.watchd.outage_threshold and self.tor_safeguard: - _log("WATCHD %.2f%% SUCCESS RATE - tor circuit blocked? won't submit fails"%success_rate, "ERROR") - if sc == 0: return False + _log("WATCHD %.2f%% SUCCESS RATE - tor circuit blocked? won't submit fails" % success_rate, "ERROR") + if sc == 0: + return False args = [] + latency_updates = [] for job in self.collected: if job.failcount == 0: - args.append( (job.failcount, job.checktime, 1, job.country, job.proto, job.success_count, job.total_duration, job.mitm, job.consecutive_success, job.proxy) ) + args.append((job.failcount, job.checktime, 1, job.country, job.proto, + job.success_count, job.total_duration, job.mitm, + job.consecutive_success, job.proxy)) + if job.last_latency_ms is not None: + latency_updates.append((job.proxy, job.last_latency_ms)) ret = False - _log("updating %d DB entries (success rate: %.2f%%)"%(len(self.collected), success_rate), 'watchd') + _log("updating %d DB entries (success rate: %.2f%%)" % (len(self.collected), success_rate), 'watchd') self._prep_db() query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=? WHERE proxy=?' self.mysqlite.executemany(query, args) + + # Update latency metrics for successful proxies + for proxy, latency_ms in latency_updates: + dbs.update_proxy_latency(self.mysqlite, proxy, latency_ms) + self.mysqlite.commit() self._close_db() self.collected = [] @@ -803,6 +943,18 @@ class Proxywatchd(): pool = connection_pool.get_pool() if pool: _log(pool.status_line(), 'stats') + # Report scaler status + _log(self.scaler.status_line(len(self.threads), self.job_queue.qsize()), 'scaler') + + # Dynamic thread scaling + if self.in_background: + target = self.scaler.should_scale( + len(self.threads), + self.job_queue.qsize(), + self.stats + ) + if target != len(self.threads): + self._adjust_threads(target) # periodic stale proxy cleanup (daily) if (time.time() - self.last_cleanup) >= 86400: