From 6cc903c92445aa0c3fb2d6997bbf3fcd700f7a39 Mon Sep 17 00:00:00 2001 From: Username Date: Thu, 8 Jan 2026 09:02:56 +0100 Subject: [PATCH] httpd: add batch API endpoint and worker improvements - /api/dashboard: single endpoint returning stats + workers + countries - dashboard.js: use batch endpoint (2 requests -> 1 per poll cycle) - _get_workers_data: refactored from /api/workers for code reuse - worker verification: trust scoring based on result accuracy - fair distribution: dynamic batch sizing based on queue and workers - queue tracking: session progress, due/claimed/pending counts --- httpd.py | 580 +++++++++++++++++++++++++++++++++----------- static/dashboard.js | 166 +++++++++---- 2 files changed, 568 insertions(+), 178 deletions(-) diff --git a/httpd.py b/httpd.py index ad6b387..2e92c27 100644 --- a/httpd.py +++ b/httpd.py @@ -18,6 +18,18 @@ import sys from collections import defaultdict import mysqlite from misc import _log +from dbs import (create_verification_tables, insert_proxy_result, + check_for_disagreement, queue_verification, + get_verification_stats, get_all_worker_trust) + +# IP geolocation (optional) +try: + import IP2Location + _geodb = IP2Location.IP2Location(os.path.join("data", "IP2LOCATION-LITE-DB1.BIN")) + _geolite = True +except (ImportError, IOError, ValueError): + _geodb = None + _geolite = False # Rate limiting configuration _rate_limits = defaultdict(list) @@ -91,6 +103,111 @@ _worker_test_history = {} _worker_test_history_lock = threading.Lock() _test_history_window = 120 # seconds to keep test history for rate calculation +# Fair distribution settings +_min_batch_size = 100 # minimum proxies per batch +_max_batch_size = 1000 # maximum proxies per batch +_worker_timeout = 120 # seconds before worker considered inactive + +# Session tracking +_session_start_time = int(time.time()) # when httpd started + +# Testing schedule configuration (set from config at startup) +_working_checktime = 300 # retest interval for working proxies (failed=0) +_fail_retry_interval = 60 # retry interval for failing proxies +_fail_retry_backoff = True # True=linear backoff (60,120,180...), False=fixed (60,60,60...) +_max_fail = 5 # failures before proxy considered dead + + +def configure_schedule(working_checktime, fail_retry_interval, fail_retry_backoff, max_fail): + """Set testing schedule parameters from config.""" + global _working_checktime, _fail_retry_interval, _fail_retry_backoff, _max_fail + _working_checktime = working_checktime + _fail_retry_interval = fail_retry_interval + _fail_retry_backoff = fail_retry_backoff + _max_fail = max_fail + + +def _build_due_condition(): + """Build SQL condition for proxy due check. + + Returns (condition_sql, params) where condition_sql is the WHERE clause + and params is a tuple of parameter values. + + Formula: + - failed=0: tested + working_checktime < now + - failed>0 with backoff: tested + (failed * fail_retry_interval) < now + - failed>0 without backoff: tested + fail_retry_interval < now + """ + now_int = int(time.time()) + if _fail_retry_backoff: + # Linear backoff: multiply interval by failure count + condition = ''' + failed >= 0 AND failed < ? + AND (tested IS NULL OR + CASE WHEN failed = 0 + THEN tested + ? < ? + ELSE tested + (failed * ?) < ? + END) + ''' + params = (_max_fail, _working_checktime, now_int, _fail_retry_interval, now_int) + else: + # Fixed interval: same delay regardless of failure count + condition = ''' + failed >= 0 AND failed < ? + AND (tested IS NULL OR + CASE WHEN failed = 0 + THEN tested + ? < ? + ELSE tested + ? < ? + END) + ''' + params = (_max_fail, _working_checktime, now_int, _fail_retry_interval, now_int) + return condition, params + + +def get_active_worker_count(): + """Count workers seen within timeout window.""" + now = time.time() + with _workers_lock: + return sum(1 for w in _workers.values() + if (now - w.get('last_seen', 0)) < _worker_timeout) + + +def get_due_proxy_count(db): + """Count proxies due for testing (not claimed).""" + with _work_claims_lock: + claimed_count = len(_work_claims) + + try: + condition, params = _build_due_condition() + query = 'SELECT COUNT(*) FROM proxylist WHERE ' + condition + result = db.execute(query, params).fetchone() + total_due = result[0] if result else 0 + return max(0, total_due - claimed_count) + except Exception: + return 0 + + +def calculate_fair_batch_size(db, worker_id): + """Calculate fair batch size based on active workers and queue size.""" + active_workers = max(1, get_active_worker_count()) + due_count = get_due_proxy_count(db) + + if due_count == 0: + return _min_batch_size + + # Fair share: divide due work among active workers + # Add 20% buffer for speed variations between workers + fair_share = int((due_count / active_workers) * 1.2) + + # Clamp to bounds + batch_size = max(_min_batch_size, min(fair_share, _max_batch_size)) + + _log('fair_batch: due=%d workers=%d share=%d batch=%d' % ( + due_count, active_workers, fair_share, batch_size), 'debug') + + return batch_size + + def load_workers(): """Load worker registry from disk.""" global _workers, _worker_keys @@ -218,8 +335,11 @@ def get_worker_test_rate(worker_id): return 0.0 # Sum tests in window and calculate rate cutoff = now - _test_history_window - total_tests = sum(c for t, c in history if t > cutoff) - oldest = min((t for t, c in history if t > cutoff), default=now) + in_window = [(t, c) for t, c in history if t > cutoff] + if not in_window: + return 0.0 + total_tests = sum(c for t, c in in_window) + oldest = min(t for t, c in in_window) elapsed = now - oldest if elapsed < 1: return 0.0 @@ -230,9 +350,9 @@ def claim_work(db, worker_id, count=100): now = time.time() now_int = int(now) - # Randomize batch size (100-250) to stagger worker completion times - # This avoids thundering herd when all workers finish and request simultaneously - target_count = random.randint(100, 250) + # Calculate fair batch size based on active workers and queue size + # Distributes work evenly: due_proxies / active_workers (with bounds) + target_count = calculate_fair_batch_size(db, worker_id) # Clean up stale claims and get current claimed set with _work_claims_lock: @@ -243,12 +363,7 @@ def claim_work(db, worker_id, count=100): claimed_keys = set(_work_claims.keys()) # Get proxies that need testing - # Respect cooldown: tested + checktime + (failed * perfail_checktime) < now # Priority: untested first, then oldest due - with randomness within tiers - checktime = 1800 # 30 min base cooldown - perfail_checktime = 3600 # +1 hour per failure - max_fail = 5 - try: # Build exclusion clause for already-claimed proxies # Use ip||':'||port to match our claim key format @@ -261,25 +376,43 @@ def claim_work(db, worker_id, count=100): exclude_clause = "" exclude_params = [] - # Priority tiers: 0=untested, 1=due >1hr ago, 2=due <1hr ago - # Random within each tier for fair distribution + # Build due condition using new schedule formula + due_condition, due_params = _build_due_condition() + + # Priority tiers: 0=untested, 1=very overdue (>1hr), 2=recently due + # Calculate overdue time based on new formula + if _fail_retry_backoff: + overdue_calc = ''' + CASE WHEN failed = 0 + THEN ? - (tested + ?) + ELSE ? - (tested + (failed * ?)) + END + ''' + priority_params = [now_int, _working_checktime, now_int, _fail_retry_interval] + else: + overdue_calc = ''' + CASE WHEN failed = 0 + THEN ? - (tested + ?) + ELSE ? - (tested + ?) + END + ''' + priority_params = [now_int, _working_checktime, now_int, _fail_retry_interval] + query = ''' SELECT ip, port, proto, failed, CASE WHEN tested IS NULL THEN 0 - WHEN (? - (tested + ? + (failed * ?))) > 3600 THEN 1 + WHEN (%s) > 3600 THEN 1 ELSE 2 END as priority FROM proxylist - WHERE failed >= 0 AND failed < ? - AND (tested IS NULL OR (tested + ? + (failed * ?)) < ?) + WHERE %s %s ORDER BY priority, RANDOM() LIMIT ? - ''' % exclude_clause + ''' % (overdue_calc, due_condition, exclude_clause) - params = [now_int, checktime, perfail_checktime, - max_fail, checktime, perfail_checktime, now_int] + exclude_params + [target_count] + params = priority_params + list(due_params) + exclude_params + [target_count] rows = db.execute(query, params).fetchall() except Exception as e: _log('claim_work query error: %s' % e, 'error') @@ -328,28 +461,77 @@ def submit_results(db, worker_id, results): if proxy_key in _work_claims: del _work_claims[proxy_key] - # Update database + # Update database - trust workers, add missing proxies if working try: - if r.get('working'): + working = 1 if r.get('working') else 0 + latency_ms = r.get('latency', 0) if working else None + error_cat = r.get('error_category') if not working else None + + if working: + # Use INSERT OR REPLACE to add working proxies that don't exist db.execute(''' - UPDATE proxylist SET + INSERT INTO proxylist (proxy, ip, port, proto, failed, tested, avg_latency, added) + VALUES (?, ?, ?, ?, 0, ?, ?, ?) + ON CONFLICT(proxy) DO UPDATE SET failed = 0, - tested = ?, - latency = ? - WHERE ip = ? AND port = ? - ''', (int(now), r.get('latency', 0), r['ip'], r['port'])) + tested = excluded.tested, + avg_latency = excluded.avg_latency + ''', (proxy_key, r['ip'], r['port'], r.get('proto', 'http'), int(now), + latency_ms, int(now))) working_count += 1 - total_latency += r.get('latency', 0) + total_latency += latency_ms or 0 + + # Geolocate working proxy if IP2Location available + if _geolite and _geodb: + try: + rec = _geodb.get_all(r['ip']) + if rec and rec.country_short and rec.country_short != '-': + db.execute( + 'UPDATE proxylist SET country=? WHERE proxy=?', + (rec.country_short, proxy_key)) + except Exception: + pass # Geolocation is best-effort else: + # For failures, only update if exists (don't add non-working proxies) db.execute(''' UPDATE proxylist SET failed = failed + 1, tested = ? WHERE ip = ? AND port = ? ''', (int(now), r['ip'], r['port'])) + + # Record result for verification system + insert_proxy_result(db, proxy_key, worker_id, working, + latency_ms=latency_ms, error_category=error_cat) + + # Check for disagreement with other workers + disagreement, other_worker, other_result = check_for_disagreement( + db, proxy_key, worker_id, working) + if disagreement: + # Queue for manager verification (priority 3 = high) + queue_verification(db, proxy_key, 'disagreement', priority=3, + worker_a=worker_id, worker_b=other_worker, + result_a=working, result_b=other_result) + elif working: + # Check for resurrection: was dead (failed >= 3), now working + row = db.execute( + 'SELECT failed FROM proxylist WHERE proxy = ?', (proxy_key,) + ).fetchone() + if row and row[0] >= 3: + queue_verification(db, proxy_key, 'resurrection', priority=3, + worker_a=worker_id, result_a=1) + else: + # Check for sudden death: was working (consecutive_success >= 3), now failed + row = db.execute( + 'SELECT consecutive_success FROM proxylist WHERE proxy = ?', (proxy_key,) + ).fetchone() + if row and row[0] and row[0] >= 3: + queue_verification(db, proxy_key, 'sudden_death', priority=2, + worker_a=worker_id, result_a=0) + processed += 1 - except Exception: - pass + except Exception as e: + _log('submit_results db error for %s: %s' % (proxy_key, e), 'error') # Update worker stats with _workers_lock: @@ -363,6 +545,9 @@ def submit_results(db, worker_id, results): w['last_batch_size'] = len(results) w['last_batch_working'] = working_count + # Commit database changes + db.commit() + # Record for test rate calculation record_test_rate(worker_id, processed) @@ -513,7 +698,8 @@ def get_system_stats(): _gc_objects_cache['value'] = len(gc.get_objects()) _gc_objects_cache['time'] = now stats['gc_objects'] = _gc_objects_cache['value'] - except Exception: + except Exception as e: + _log('gc stats error: %s' % e, 'debug') stats['gc_count_gen0'] = stats['gc_count_gen1'] = stats['gc_count_gen2'] = 0 stats['gc_objects'] = 0 @@ -586,8 +772,8 @@ def get_db_health(db): ).fetchone() stats['failing_count'] = row[0] if row else 0 - except Exception: - pass + except Exception as e: + _log('get_db_health error: %s' % e, 'warn') # Update cache _db_health_cache['value'] = stats @@ -859,8 +1045,8 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler): db = mysqlite.mysqlite(self.database, str) stats['db'] = self.get_db_stats() stats['db_health'] = get_db_health(db) - except Exception: - pass + except Exception as e: + _log('handle_stats db error: %s' % e, 'warn') self.send_json(stats) @@ -881,8 +1067,8 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler): if self.stats_provider: try: stats.update(self.stats_provider()) - except Exception: - pass + except Exception as e: + _log('stats_provider error in export: %s' % e, 'warn') stats['system'] = get_system_stats() stats['exported_at'] = time.strftime('%Y-%m-%d %H:%M:%S') @@ -987,6 +1173,13 @@ class ProxyAPIServer(threading.Thread): load_static_files(THEME) # Load worker registry from disk load_workers() + # Create verification tables if they don't exist + try: + db = mysqlite.mysqlite(self.database, str) + create_verification_tables(db) + _log('verification tables initialized', 'debug') + except Exception as e: + _log('failed to create verification tables: %s' % e, 'warn') def _wsgi_app(self, environ, start_response): """WSGI application wrapper for gevent.""" @@ -1009,7 +1202,7 @@ class ProxyAPIServer(threading.Thread): return [b'Method not allowed'] # POST only allowed for worker API endpoints - post_endpoints = ('/api/register', '/api/results') + post_endpoints = ('/api/register', '/api/results', '/api/heartbeat') if method == 'POST' and path not in post_endpoints: start_response('405 Method Not Allowed', [('Content-Type', 'text/plain')]) return [b'POST not allowed for this endpoint'] @@ -1072,6 +1265,7 @@ class ProxyAPIServer(threading.Thread): '/dashboard': 'web dashboard (HTML)', '/map': 'proxy distribution by country (HTML)', '/mitm': 'MITM certificate search (HTML)', + '/api/dashboard': 'batch endpoint: stats + workers + countries (JSON)', '/api/stats': 'runtime statistics (JSON)', '/api/mitm': 'MITM certificate statistics (JSON)', '/api/countries': 'proxy counts by country (JSON)', @@ -1145,12 +1339,53 @@ class ProxyAPIServer(threading.Thread): db = mysqlite.mysqlite(self.database, str) stats['db'] = self._get_db_stats(db) stats['db_health'] = get_db_health(db) - except Exception: - pass + except Exception as e: + _log('api/stats db error: %s' % e, 'warn') # Add profiling flag (from constructor or stats_provider) if 'profiling' not in stats: stats['profiling'] = self.profiling return json.dumps(stats, indent=2), 'application/json', 200 + + elif path == '/api/dashboard': + # Batch endpoint combining stats + workers + countries + # Reduces RTT for dashboard - single request instead of multiple + result = {} + + # 1. Runtime stats (same as /api/stats) + stats = {} + if self.stats_provider: + try: + stats = self.stats_provider() + except Exception as e: + _log('api/dashboard stats_provider error: %s' % e, 'warn') + stats['system'] = get_system_stats() + if 'profiling' not in stats: + stats['profiling'] = self.profiling + result['stats'] = stats + + # 2. Database stats and health + try: + db = mysqlite.mysqlite(self.database, str) + result['stats']['db'] = self._get_db_stats(db) + result['stats']['db_health'] = get_db_health(db) + + # 3. Countries (same as /api/countries) + rows = db.execute( + 'SELECT country, COUNT(*) as c FROM proxylist WHERE failed=0 AND country IS NOT NULL ' + 'GROUP BY country ORDER BY c DESC' + ).fetchall() + result['countries'] = {r[0]: r[1] for r in rows} + + # 4. Workers (same as /api/workers) + result['workers'] = self._get_workers_data(db) + + db.close() + except Exception as e: + _log('api/dashboard db error: %s' % e, 'warn') + result['countries'] = {} + result['workers'] = {'workers': [], 'total': 0, 'active': 0} + + return json.dumps(result, indent=2), 'application/json', 200 elif path == '/api/mitm': # MITM certificate statistics if self.stats_provider: @@ -1255,8 +1490,8 @@ class ProxyAPIServer(threading.Thread): if _has_objgraph: try: result['objgraph_common'] = objgraph.most_common_types(limit=15) - except Exception: - pass + except Exception as e: + result['objgraph_error'] = str(e) # Pympler summary (if available) if _has_pympler: @@ -1329,6 +1564,8 @@ class ProxyAPIServer(threading.Thread): results = post_data.get('results', []) if not results: return json.dumps({'error': 'no results provided'}), 'application/json', 400 + working = sum(1 for r in results if r.get('working')) + _log('results: %d from %s (%d working)' % (len(results), worker_id[:8], working), 'info') try: db = mysqlite.mysqlite(self.database, str) processed = submit_results(db, worker_id, results) @@ -1348,10 +1585,11 @@ class ProxyAPIServer(threading.Thread): worker_id, _ = get_worker_by_key(key) if not worker_id: return json.dumps({'error': 'worker not found'}), 'application/json', 404 - # Update worker Tor and profiling status + # Update worker Tor, profiling, and thread status tor_ok = post_data.get('tor_ok', True) if post_data else True tor_ip = post_data.get('tor_ip') if post_data else None profiling = post_data.get('profiling', False) if post_data else False + threads = post_data.get('threads', 0) if post_data else 0 now = time.time() with _workers_lock: if worker_id in _workers: @@ -1360,6 +1598,7 @@ class ProxyAPIServer(threading.Thread): _workers[worker_id]['tor_ip'] = tor_ip _workers[worker_id]['tor_last_check'] = now _workers[worker_id]['profiling'] = profiling + _workers[worker_id]['threads'] = threads return json.dumps({ 'worker_id': worker_id, 'message': 'heartbeat received', @@ -1367,94 +1606,14 @@ class ProxyAPIServer(threading.Thread): elif path == '/api/workers': # List connected workers - now = time.time() - with _workers_lock: - workers = [] - total_tested = 0 - total_working = 0 - total_failed = 0 - for wid, info in _workers.items(): - tested = info.get('proxies_tested', 0) - working = info.get('proxies_working', 0) - failed = info.get('proxies_failed', 0) - total_latency = info.get('total_latency', 0) - avg_latency = round(total_latency / working, 2) if working > 0 else 0 - success_rate = round(100 * working / tested, 1) if tested > 0 else 0 - total_tested += tested - total_working += working - total_failed += failed - # Tor status with age check - tor_ok = info.get('tor_ok', True) - tor_ip = info.get('tor_ip') - tor_last_check = info.get('tor_last_check', 0) - tor_age = int(now - tor_last_check) if tor_last_check else None - worker_profiling = info.get('profiling', False) - # Calculate test rate for this worker - test_rate = get_worker_test_rate(wid) - workers.append({ - 'id': wid, - 'name': info['name'], - 'ip': info['ip'], - 'registered': int(info['registered']), - 'last_seen': int(info['last_seen']), - 'age': int(now - info['last_seen']), - 'jobs_completed': info.get('jobs_completed', 0), - 'proxies_tested': tested, - 'proxies_working': working, - 'proxies_failed': failed, - 'success_rate': success_rate, - 'avg_latency': avg_latency, - 'last_batch_size': info.get('last_batch_size', 0), - 'last_batch_working': info.get('last_batch_working', 0), - 'active': (now - info['last_seen']) < 120, - 'test_rate': round(test_rate, 2), - 'tor_ok': tor_ok, - 'tor_ip': tor_ip, - 'tor_age': tor_age, - 'profiling': worker_profiling, - }) - # Sort by name for consistent display - workers.sort(key=lambda w: w['name']) - # Get queue status from database - queue_stats = {'pending': 0, 'claimed': 0, 'due': 0} try: db = mysqlite.mysqlite(self.database, str) - # Pending = total eligible (not dead) - row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed >= 0 AND failed < 5').fetchone() - queue_stats['pending'] = row[0] if row else 0 - # Claimed = currently being tested by workers - with _work_claims_lock: - queue_stats['claimed'] = len(_work_claims) - # Due = ready for testing (respecting cooldown) - now_int = int(time.time()) - checktime = 1800 # 30 min base - perfail_checktime = 3600 # +1 hour per failure - max_fail = 5 - row = db.execute(''' - SELECT COUNT(*) FROM proxylist - WHERE failed >= 0 AND failed < ? - AND (tested IS NULL OR (tested + ? + (failed * ?)) < ?) - ''', (max_fail, checktime, perfail_checktime, now_int)).fetchone() - due_total = row[0] if row else 0 - queue_stats['due'] = max(0, due_total - queue_stats['claimed']) + workers_data = self._get_workers_data(db) db.close() - except Exception: - pass - # Calculate combined test rate from active workers - combined_rate = sum(w['test_rate'] for w in workers if w['active']) - return json.dumps({ - 'workers': workers, - 'total': len(workers), - 'active': sum(1 for w in workers if w['active']), - 'summary': { - 'total_tested': total_tested, - 'total_working': total_working, - 'total_failed': total_failed, - 'overall_success_rate': round(100 * total_working / total_tested, 1) if total_tested > 0 else 0, - 'combined_rate': round(combined_rate, 2), - }, - 'queue': queue_stats, - }, indent=2), 'application/json', 200 + return json.dumps(workers_data, indent=2), 'application/json', 200 + except Exception as e: + _log('api/workers error: %s' % e, 'warn') + return json.dumps({'error': str(e)}), 'application/json', 500 else: return json.dumps({'error': 'not found'}), 'application/json', 404 @@ -1483,25 +1642,172 @@ class ProxyAPIServer(threading.Thread): stats['total'] = row[0] if row else 0 # Proxies due for testing (eligible for worker claims) - now_int = int(time.time()) - checktime = 1800 # 30 min base cooldown - perfail_checktime = 3600 # +1 hour per failure - max_fail = 5 - row = db.execute(''' - SELECT COUNT(*) FROM proxylist - WHERE failed >= 0 AND failed < ? - AND (tested IS NULL OR (tested + ? + (failed * ?)) < ?) - ''', (max_fail, checktime, perfail_checktime, now_int)).fetchone() + due_condition, due_params = _build_due_condition() + row = db.execute( + 'SELECT COUNT(*) FROM proxylist WHERE ' + due_condition, + due_params).fetchone() due_total = row[0] if row else 0 # Subtract currently claimed with _work_claims_lock: claimed_count = len(_work_claims) stats['due'] = max(0, due_total - claimed_count) stats['claimed'] = claimed_count - except Exception: - pass + except Exception as e: + _log('_get_db_stats error: %s' % e, 'warn') return stats + def _get_workers_data(self, db): + """Get worker status data. Used by /api/workers and /api/dashboard.""" + now = time.time() + with _workers_lock: + workers = [] + total_tested = 0 + total_working = 0 + total_failed = 0 + for wid, info in _workers.items(): + tested = info.get('proxies_tested', 0) + working = info.get('proxies_working', 0) + failed = info.get('proxies_failed', 0) + total_latency = info.get('total_latency', 0) + avg_latency = round(total_latency / working, 2) if working > 0 else 0 + success_rate = round(100 * working / tested, 1) if tested > 0 else 0 + total_tested += tested + total_working += working + total_failed += failed + # Tor status with age check + tor_ok = info.get('tor_ok', True) + tor_ip = info.get('tor_ip') + tor_last_check = info.get('tor_last_check', 0) + tor_age = int(now - tor_last_check) if tor_last_check else None + worker_profiling = info.get('profiling', False) + worker_threads = info.get('threads', 0) + # Calculate test rate for this worker + test_rate = get_worker_test_rate(wid) + workers.append({ + 'id': wid, + 'name': info['name'], + 'ip': info['ip'], + 'registered': int(info['registered']), + 'last_seen': int(info['last_seen']), + 'age': int(now - info['last_seen']), + 'jobs_completed': info.get('jobs_completed', 0), + 'proxies_tested': tested, + 'proxies_working': working, + 'proxies_failed': failed, + 'success_rate': success_rate, + 'avg_latency': avg_latency, + 'last_batch_size': info.get('last_batch_size', 0), + 'last_batch_working': info.get('last_batch_working', 0), + 'active': (now - info['last_seen']) < 120, + 'test_rate': round(test_rate, 2), + 'tor_ok': tor_ok, + 'tor_ip': tor_ip, + 'tor_age': tor_age, + 'profiling': worker_profiling, + 'threads': worker_threads, + }) + + # Sort by name for consistent display + workers.sort(key=lambda w: w['name']) + + # Get queue status from database + queue_stats = { + 'pending': 0, 'claimed': 0, 'due': 0, 'total': 0, + 'untested': 0, 'session_tested': 0, 'session_pct': 0, + 'session_start': _session_start_time + } + try: + # Total proxies in database + row = db.execute('SELECT COUNT(*) FROM proxylist').fetchone() + queue_stats['total'] = row[0] if row else 0 + # Never tested (tested IS NULL) + row = db.execute('SELECT COUNT(*) FROM proxylist WHERE tested IS NULL').fetchone() + queue_stats['untested'] = row[0] if row else 0 + # Tested this session (since httpd started) + row = db.execute('SELECT COUNT(*) FROM proxylist WHERE tested >= ?', (_session_start_time,)).fetchone() + queue_stats['session_tested'] = row[0] if row else 0 + # Pending = total eligible (not dead) + row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed >= 0 AND failed < 5').fetchone() + queue_stats['pending'] = row[0] if row else 0 + # Session progress percentage (tested / total, capped at 100%) + if queue_stats['total'] > 0: + pct = 100.0 * queue_stats['session_tested'] / queue_stats['total'] + queue_stats['session_pct'] = round(min(pct, 100.0), 1) + # Claimed = currently being tested by workers + with _work_claims_lock: + queue_stats['claimed'] = len(_work_claims) + # Due = ready for testing (respecting cooldown) + due_condition, due_params = _build_due_condition() + row = db.execute( + 'SELECT COUNT(*) FROM proxylist WHERE ' + due_condition, + due_params).fetchone() + due_total = row[0] if row else 0 + queue_stats['due'] = max(0, due_total - queue_stats['claimed']) + except Exception as e: + _log('_get_workers_data queue stats error: %s' % e, 'warn') + + # Calculate combined test rate from active workers + combined_rate = sum(w['test_rate'] for w in workers if w['active']) + + # Get manager's own stats (local proxy testing) + manager_stats = None + if self.stats_provider: + try: + stats = self.stats_provider() + threads = stats.get('threads', 0) + if threads > 0: # Manager has local testing enabled + manager_stats = { + 'threads': threads, + 'tested': stats.get('tested', 0), + 'passed': stats.get('passed', 0), + 'rate': round(stats.get('recent_rate', 0), 2), + 'success_rate': round(stats.get('success_rate', 0), 1), + 'uptime': stats.get('uptime_seconds', 0), + 'queue_size': stats.get('queue_size', 0), + } + except Exception as e: + _log('_get_workers_data manager stats error: %s' % e, 'warn') + + # Get verification stats and worker trust + verification_stats = {'queue_size': 0, 'queue_by_trigger': {}} + worker_trust_data = {} + try: + verification_stats = get_verification_stats(db) + worker_trust_data = get_all_worker_trust(db) + except Exception as e: + _log('_get_workers_data verification stats error: %s' % e, 'warn') + + # Add trust scores to workers + for w in workers: + wid = w['id'] + if wid in worker_trust_data: + trust = worker_trust_data[wid] + w['trust_score'] = round(trust['trust_score'], 2) + w['verifications'] = trust['verifications'] + w['trust_correct'] = trust['correct'] + w['trust_incorrect'] = trust['incorrect'] + else: + w['trust_score'] = 1.0 # Default for new workers + w['verifications'] = 0 + w['trust_correct'] = 0 + w['trust_incorrect'] = 0 + + return { + 'workers': workers, + 'total': len(workers), + 'active': sum(1 for w in workers if w['active']), + 'manager': manager_stats, + 'summary': { + 'total_tested': total_tested, + 'total_working': total_working, + 'total_failed': total_failed, + 'overall_success_rate': round(100 * total_working / total_tested, 1) if total_tested > 0 else 0, + 'combined_rate': round(combined_rate, 2), + }, + 'queue': queue_stats, + 'verification': verification_stats, + } + def run(self): ProxyAPIHandler.database = self.database ProxyAPIHandler.stats_provider = self.stats_provider diff --git a/static/dashboard.js b/static/dashboard.js index 4e87ddd..16ff4f5 100644 --- a/static/dashboard.js +++ b/static/dashboard.js @@ -608,19 +608,24 @@ function update(d) { } function fetchStats() { - fetch('/api/stats') + // Use batch endpoint for reduced RTT (single request instead of multiple) + fetch('/api/dashboard') .then(function(r) { return r.json(); }) - .then(update) - .catch(function(e) { $('dot').className = 'dot err'; $('statusTxt').textContent = 'Error'; }); - // Also fetch worker stats - fetchWorkers(); -} - -function fetchWorkers() { - fetch('/api/workers') - .then(function(r) { return r.json(); }) - .then(updateWorkers) - .catch(function(e) { console.error('Failed to fetch workers:', e); }); + .then(function(data) { + // Extract stats (same structure as /api/stats) + if (data.stats) { + update(data.stats); + } + // Extract workers data (same structure as /api/workers) + if (data.workers) { + updateWorkers(data.workers); + } + }) + .catch(function(e) { + $('dot').className = 'dot err'; + $('statusTxt').textContent = 'Error'; + console.error('Failed to fetch dashboard:', e); + }); } function updateWorkers(data) { @@ -646,46 +651,125 @@ function updateWorkers(data) { // Queue status if (data.queue) { - if ($('queuePending')) $('queuePending').textContent = fmt(data.queue.pending || 0); - if ($('queueClaimed')) $('queueClaimed').textContent = fmt(data.queue.claimed || 0); - if ($('queueDue')) $('queueDue').textContent = fmt(data.queue.due || 0); + var q = data.queue; + var pct = q.session_pct || 0; + + if ($('queueTotal')) $('queueTotal').textContent = fmt(q.total || 0); + if ($('queueUntested')) $('queueUntested').textContent = fmt(q.untested || 0); + if ($('queueClaimed')) $('queueClaimed').textContent = fmt(q.claimed || 0); + if ($('queueDue')) $('queueDue').textContent = fmt(q.due || 0); + if ($('queueSessionTested')) $('queueSessionTested').textContent = fmt(q.session_tested || 0); + if ($('queuePending')) $('queuePending').textContent = fmt(q.pending || 0); + if ($('queueTotal2')) $('queueTotal2').textContent = fmt(q.total || 0); + if ($('queueSessionPct')) $('queueSessionPct').textContent = pct + '%'; + if ($('queueProgressBar')) $('queueProgressBar').style.width = Math.min(pct, 100) + '%'; } // Update worker cards var container = $('workerCards'); if (!container) return; + var html = ''; + + // Helper to build a unified worker/manager card + function buildCard(opts) { + var rate = opts.rate || 0; + var successRate = opts.successRate || 0; + var queue = opts.queue || 0; + var rateClass = successRate >= 50 ? 'grn' : (successRate >= 20 ? 'yel' : 'red'); + var barColor = successRate >= 50 ? 'var(--green)' : (successRate >= 20 ? 'var(--yellow)' : 'var(--red)'); + var borderStyle = opts.isManager ? 'border:1px solid var(--cyan);box-shadow:0 0 12px rgba(56,189,248,0.15)' : ''; + var badges = '' + (opts.active ? 'ACTIVE' : 'OFFLINE') + ''; + if (opts.profiling) badges += 'PROF'; + // Trust indicator for workers with verified results + if (opts.trustScore !== undefined && opts.trustScore < 0.8 && !opts.isManager) { + var trustClass = opts.trustScore < 0.5 ? 'tag-err' : 'tag-warn'; + badges += 'LOW TRUST'; + } + // Calculate ETA based on queue and rate + var eta = '-'; + if (rate > 0 && queue > 0) { + var secs = Math.round(queue / rate); + if (secs < 60) eta = secs + 's'; + else if (secs < 3600) eta = Math.round(secs / 60) + 'm'; + else if (secs < 86400) eta = Math.round(secs / 3600) + 'h'; + else eta = Math.round(secs / 86400) + 'd'; + } + + return '
' + + '
' + + '' + opts.name + '' + + '' + badges + '' + + '
' + + '
' + + '
Rate' + (rate > 0 ? rate.toFixed(1) + '/s' : '-') + '
' + + '
Tested' + fmt(opts.tested) + '
' + + '
Working' + fmt(opts.working) + '
' + + '
Success' + successRate.toFixed(1) + '%
' + + '
Queue' + fmt(queue) + '
' + + '
ETA' + eta + '
' + + '
' + + '
' + + '
' + opts.barLabel + '
' + + '
' + + '
' + + '
' + + '
' + + '
' + opts.footer + '
' + + '
'; + } + + // Global queue info for all cards + var globalQueue = data.queue ? data.queue.due : 0; + + // Manager card (if manager has local testing enabled) + if (data.manager) { + var m = data.manager; + html += buildCard({ + name: 'Manager', + isManager: true, + active: true, + rate: m.rate, + tested: m.tested, + working: m.passed, + successRate: m.success_rate, + queue: globalQueue, + barLabel: m.threads + ' threads | Success Rate', + footer: 'Uptime: ' + formatAge(m.uptime).replace(' ago', ''), + profiling: false + }); + } + if (!data.workers || data.workers.length === 0) { - container.innerHTML = '
' + - '
No workers connected
' + - '
Add workers with: python ppf.py --register --server URL
'; + if (!data.manager) { + container.innerHTML = '
' + + '
No workers connected
' + + '
Add workers with: python ppf.py --register --server URL
'; + return; + } + container.innerHTML = html; return; } - var html = ''; data.workers.forEach(function(w) { - var statusClass = w.active ? 'grn' : 'red'; - var statusText = w.active ? 'ACTIVE' : 'OFFLINE'; - var successRate = w.success_rate || 0; - var testRate = w.test_rate || 0; - var rateClass = successRate >= 50 ? 'grn' : (successRate >= 20 ? 'yel' : 'red'); - var profBadge = w.profiling ? 'PROF' : ''; - - html += '
' + - '
' + - '' + w.name + '' + - '' + statusText + '' + profBadge + '' + - '
' + - '
' + - '
Rate' + (testRate > 0 ? testRate.toFixed(1) + '/s' : '-') + '
' + - '
Tested' + fmt(w.proxies_tested) + '
' + - '
Working' + fmt(w.proxies_working) + '
' + - '
Success' + successRate.toFixed(1) + '%
' + - '
' + - '
' + - 'IP: ' + w.ip + ' | Last: ' + formatAge(w.age) + - '
' + - '
'; + var threadLabel = w.threads > 0 ? (w.threads + ' threads | ') : ''; + var trustLabel = w.verifications > 0 ? ('Trust: ' + (w.trust_score * 100).toFixed(0) + '%') : ''; + var footerInfo = 'Last seen: ' + formatAge(w.age); + if (trustLabel) footerInfo += ' | ' + trustLabel; + html += buildCard({ + name: w.name, + isManager: false, + active: w.active, + rate: w.test_rate, + tested: w.proxies_tested, + working: w.proxies_working, + successRate: w.success_rate, + queue: globalQueue, + barLabel: threadLabel + 'Success Rate', + footer: footerInfo, + profiling: w.profiling, + trustScore: w.trust_score + }); }); container.innerHTML = html; }