httpd: add worker test rate tracking

Track per-worker test rates using 120s sliding window.
Display combined rate in dashboard and individual rates
in worker cards.
This commit is contained in:
Username
2025-12-28 16:43:53 +01:00
parent 2bc00d3ebd
commit 18ae73bfb8
2 changed files with 51 additions and 8 deletions

View File

@@ -86,6 +86,11 @@ _master_key = None # master key for worker registration
_claim_timeout = 300 # seconds before unclaimed work is released
_workers_file = 'data/workers.json' # persistent storage
# Test rate tracking: worker_id -> list of (timestamp, count) tuples
_worker_test_history = {}
_worker_test_history_lock = threading.Lock()
_test_history_window = 120 # seconds to keep test history for rate calculation
def load_workers():
"""Load worker registry from disk."""
global _workers, _worker_keys
@@ -187,6 +192,39 @@ def update_worker_heartbeat(worker_id):
if worker_id in _workers:
_workers[worker_id]['last_seen'] = time.time()
def record_test_rate(worker_id, count):
"""Record test submission for rate calculation."""
now = time.time()
with _worker_test_history_lock:
if worker_id not in _worker_test_history:
_worker_test_history[worker_id] = []
_worker_test_history[worker_id].append((now, count))
# Prune old entries
cutoff = now - _test_history_window
_worker_test_history[worker_id] = [
(t, c) for t, c in _worker_test_history[worker_id] if t > cutoff
]
def get_worker_test_rate(worker_id):
"""Calculate worker's test rate (tests/sec) over recent window."""
now = time.time()
with _worker_test_history_lock:
if worker_id not in _worker_test_history:
return 0.0
history = _worker_test_history[worker_id]
if not history:
return 0.0
# Sum tests in window and calculate rate
cutoff = now - _test_history_window
total_tests = sum(c for t, c in history if t > cutoff)
oldest = min((t for t, c in history if t > cutoff), default=now)
elapsed = now - oldest
if elapsed < 1:
return 0.0
return total_tests / elapsed
def claim_work(db, worker_id, count=100):
"""Claim a batch of proxies for testing. Returns list of proxy dicts."""
now = time.time()
@@ -325,6 +363,9 @@ def submit_results(db, worker_id, results):
w['last_batch_size'] = len(results)
w['last_batch_working'] = working_count
# Record for test rate calculation
record_test_rate(worker_id, processed)
# Save workers periodically (every 60s)
if now - _last_workers_save > 60:
save_workers()
@@ -1292,6 +1333,8 @@ class ProxyAPIServer(threading.Thread):
tor_last_check = info.get('tor_last_check', 0)
tor_age = int(now - tor_last_check) if tor_last_check else None
worker_profiling = info.get('profiling', False)
# Calculate test rate for this worker
test_rate = get_worker_test_rate(wid)
workers.append({
'id': wid,
'name': info['name'],
@@ -1308,6 +1351,7 @@ class ProxyAPIServer(threading.Thread):
'last_batch_size': info.get('last_batch_size', 0),
'last_batch_working': info.get('last_batch_working', 0),
'active': (now - info['last_seen']) < 120,
'test_rate': round(test_rate, 2),
'tor_ok': tor_ok,
'tor_ip': tor_ip,
'tor_age': tor_age,
@@ -1340,6 +1384,8 @@ class ProxyAPIServer(threading.Thread):
db.close()
except Exception:
pass
# Calculate combined test rate from active workers
combined_rate = sum(w['test_rate'] for w in workers if w['active'])
return json.dumps({
'workers': workers,
'total': len(workers),
@@ -1349,6 +1395,7 @@ class ProxyAPIServer(threading.Thread):
'total_working': total_working,
'total_failed': total_failed,
'overall_success_rate': round(100 * total_working / total_tested, 1) if total_tested > 0 else 0,
'combined_rate': round(combined_rate, 2),
},
'queue': queue_stats,
}, indent=2), 'application/json', 200

View File

@@ -619,13 +619,8 @@ function updateWorkers(data) {
// Main panel distributed workers
if ($('dwTested')) $('dwTested').textContent = fmt(data.summary.total_tested);
if ($('dwWorking')) $('dwWorking').textContent = fmt(data.summary.total_working);
// Calculate combined rate from worker stats
var combinedRate = 0;
if (data.workers) {
data.workers.forEach(function(w) {
if (w.active && w.test_rate) combinedRate += w.test_rate;
});
}
// Combined rate from summary
var combinedRate = data.summary.combined_rate || 0;
if ($('dwRate')) $('dwRate').textContent = combinedRate > 0 ? combinedRate.toFixed(1) + '/s' : '-';
}
@@ -652,6 +647,7 @@ function updateWorkers(data) {
var statusClass = w.active ? 'grn' : 'red';
var statusText = w.active ? 'ACTIVE' : 'OFFLINE';
var successRate = w.success_rate || 0;
var testRate = w.test_rate || 0;
var rateClass = successRate >= 50 ? 'grn' : (successRate >= 20 ? 'yel' : 'red');
var profBadge = w.profiling ? '<span class="tag tag-warn" style="margin-left:4px;font-size:9px">PROF</span>' : '';
@@ -661,10 +657,10 @@ function updateWorkers(data) {
'<span><span class="tag tag-' + (w.active ? 'ok' : 'err') + '">' + statusText + '</span>' + profBadge + '</span>' +
'</div>' +
'<div class="stats-wrap" style="margin:0">' +
'<div class="stat-row"><span class="stat-lbl">Rate</span><span class="stat-val cyn">' + (testRate > 0 ? testRate.toFixed(1) + '/s' : '-') + '</span></div>' +
'<div class="stat-row"><span class="stat-lbl">Tested</span><span class="stat-val">' + fmt(w.proxies_tested) + '</span></div>' +
'<div class="stat-row"><span class="stat-lbl">Working</span><span class="stat-val grn">' + fmt(w.proxies_working) + '</span></div>' +
'<div class="stat-row"><span class="stat-lbl">Success</span><span class="stat-val ' + rateClass + '">' + successRate.toFixed(1) + '%</span></div>' +
'<div class="stat-row"><span class="stat-lbl">Jobs</span><span class="stat-val">' + fmt(w.jobs_completed) + '</span></div>' +
'</div>' +
'<div style="font-size:0.75em;color:var(--dim);margin-top:8px">' +
'IP: ' + w.ip + ' | Last: ' + formatAge(w.age) +