From 18ae73bfb866e6de73b0d919f22e7109fbac39ef Mon Sep 17 00:00:00 2001 From: Username Date: Sun, 28 Dec 2025 16:43:53 +0100 Subject: [PATCH] httpd: add worker test rate tracking Track per-worker test rates using 120s sliding window. Display combined rate in dashboard and individual rates in worker cards. --- httpd.py | 47 +++++++++++++++++++++++++++++++++++++++++++++ static/dashboard.js | 12 ++++-------- 2 files changed, 51 insertions(+), 8 deletions(-) diff --git a/httpd.py b/httpd.py index f5d83f9..30d6b6e 100644 --- a/httpd.py +++ b/httpd.py @@ -86,6 +86,11 @@ _master_key = None # master key for worker registration _claim_timeout = 300 # seconds before unclaimed work is released _workers_file = 'data/workers.json' # persistent storage +# Test rate tracking: worker_id -> list of (timestamp, count) tuples +_worker_test_history = {} +_worker_test_history_lock = threading.Lock() +_test_history_window = 120 # seconds to keep test history for rate calculation + def load_workers(): """Load worker registry from disk.""" global _workers, _worker_keys @@ -187,6 +192,39 @@ def update_worker_heartbeat(worker_id): if worker_id in _workers: _workers[worker_id]['last_seen'] = time.time() + +def record_test_rate(worker_id, count): + """Record test submission for rate calculation.""" + now = time.time() + with _worker_test_history_lock: + if worker_id not in _worker_test_history: + _worker_test_history[worker_id] = [] + _worker_test_history[worker_id].append((now, count)) + # Prune old entries + cutoff = now - _test_history_window + _worker_test_history[worker_id] = [ + (t, c) for t, c in _worker_test_history[worker_id] if t > cutoff + ] + + +def get_worker_test_rate(worker_id): + """Calculate worker's test rate (tests/sec) over recent window.""" + now = time.time() + with _worker_test_history_lock: + if worker_id not in _worker_test_history: + return 0.0 + history = _worker_test_history[worker_id] + if not history: + return 0.0 + # Sum tests in window and calculate rate + cutoff = now - _test_history_window + total_tests = sum(c for t, c in history if t > cutoff) + oldest = min((t for t, c in history if t > cutoff), default=now) + elapsed = now - oldest + if elapsed < 1: + return 0.0 + return total_tests / elapsed + def claim_work(db, worker_id, count=100): """Claim a batch of proxies for testing. Returns list of proxy dicts.""" now = time.time() @@ -325,6 +363,9 @@ def submit_results(db, worker_id, results): w['last_batch_size'] = len(results) w['last_batch_working'] = working_count + # Record for test rate calculation + record_test_rate(worker_id, processed) + # Save workers periodically (every 60s) if now - _last_workers_save > 60: save_workers() @@ -1292,6 +1333,8 @@ class ProxyAPIServer(threading.Thread): tor_last_check = info.get('tor_last_check', 0) tor_age = int(now - tor_last_check) if tor_last_check else None worker_profiling = info.get('profiling', False) + # Calculate test rate for this worker + test_rate = get_worker_test_rate(wid) workers.append({ 'id': wid, 'name': info['name'], @@ -1308,6 +1351,7 @@ class ProxyAPIServer(threading.Thread): 'last_batch_size': info.get('last_batch_size', 0), 'last_batch_working': info.get('last_batch_working', 0), 'active': (now - info['last_seen']) < 120, + 'test_rate': round(test_rate, 2), 'tor_ok': tor_ok, 'tor_ip': tor_ip, 'tor_age': tor_age, @@ -1340,6 +1384,8 @@ class ProxyAPIServer(threading.Thread): db.close() except Exception: pass + # Calculate combined test rate from active workers + combined_rate = sum(w['test_rate'] for w in workers if w['active']) return json.dumps({ 'workers': workers, 'total': len(workers), @@ -1349,6 +1395,7 @@ class ProxyAPIServer(threading.Thread): 'total_working': total_working, 'total_failed': total_failed, 'overall_success_rate': round(100 * total_working / total_tested, 1) if total_tested > 0 else 0, + 'combined_rate': round(combined_rate, 2), }, 'queue': queue_stats, }, indent=2), 'application/json', 200 diff --git a/static/dashboard.js b/static/dashboard.js index 0715072..2a44846 100644 --- a/static/dashboard.js +++ b/static/dashboard.js @@ -619,13 +619,8 @@ function updateWorkers(data) { // Main panel distributed workers if ($('dwTested')) $('dwTested').textContent = fmt(data.summary.total_tested); if ($('dwWorking')) $('dwWorking').textContent = fmt(data.summary.total_working); - // Calculate combined rate from worker stats - var combinedRate = 0; - if (data.workers) { - data.workers.forEach(function(w) { - if (w.active && w.test_rate) combinedRate += w.test_rate; - }); - } + // Combined rate from summary + var combinedRate = data.summary.combined_rate || 0; if ($('dwRate')) $('dwRate').textContent = combinedRate > 0 ? combinedRate.toFixed(1) + '/s' : '-'; } @@ -652,6 +647,7 @@ function updateWorkers(data) { var statusClass = w.active ? 'grn' : 'red'; var statusText = w.active ? 'ACTIVE' : 'OFFLINE'; var successRate = w.success_rate || 0; + var testRate = w.test_rate || 0; var rateClass = successRate >= 50 ? 'grn' : (successRate >= 20 ? 'yel' : 'red'); var profBadge = w.profiling ? 'PROF' : ''; @@ -661,10 +657,10 @@ function updateWorkers(data) { '' + statusText + '' + profBadge + '' + '' + '
' + + '
Rate' + (testRate > 0 ? testRate.toFixed(1) + '/s' : '-') + '
' + '
Tested' + fmt(w.proxies_tested) + '
' + '
Working' + fmt(w.proxies_working) + '
' + '
Success' + successRate.toFixed(1) + '%
' + - '
Jobs' + fmt(w.jobs_completed) + '
' + '
' + '
' + 'IP: ' + w.ip + ' | Last: ' + formatAge(w.age) +