httpd: remove V1 work distribution and result submission
Drop _work_claims tracking, claim_work(), submit_results(), get_due_proxy_count(), calculate_fair_batch_size(), and the /api/work + /api/results endpoint handlers.
This commit is contained in:
336
httpd.py
336
httpd.py
@@ -73,11 +73,8 @@ import string
|
||||
|
||||
_workers = {} # worker_id -> {name, ip, last_seen, jobs_completed, proxies_tested, ...}
|
||||
_workers_lock = threading.Lock()
|
||||
_work_claims = {} # proxy_key -> {worker_id, claimed_at}
|
||||
_work_claims_lock = threading.Lock()
|
||||
_worker_keys = set() # valid API keys
|
||||
_master_key = None # master key for worker registration
|
||||
_claim_timeout = 300 # seconds before unclaimed work is released
|
||||
_workers_file = 'data/workers.json' # persistent storage
|
||||
|
||||
# URL claim tracking (parallel to proxy claims)
|
||||
@@ -101,11 +98,6 @@ _worker_test_history = {}
|
||||
_worker_test_history_lock = threading.Lock()
|
||||
_test_history_window = 120 # seconds to keep test history for rate calculation
|
||||
|
||||
# Fair distribution settings
|
||||
_min_batch_size = 1 # minimum proxies per batch
|
||||
_max_batch_size = 10000 # maximum proxies per batch
|
||||
_worker_timeout = 120 # seconds before worker considered inactive
|
||||
|
||||
# Session tracking
|
||||
_session_start_time = int(time.time()) # when httpd started
|
||||
|
||||
@@ -171,53 +163,6 @@ def _build_due_condition():
|
||||
return condition, params
|
||||
|
||||
|
||||
def get_active_worker_count():
|
||||
"""Count workers seen within timeout window."""
|
||||
now = time.time()
|
||||
with _workers_lock:
|
||||
return sum(1 for w in _workers.values()
|
||||
if (now - w.get('last_seen', 0)) < _worker_timeout)
|
||||
|
||||
|
||||
def get_due_proxy_count(db):
|
||||
"""Count proxies due for testing (not claimed)."""
|
||||
with _work_claims_lock:
|
||||
claimed_count = len(_work_claims)
|
||||
|
||||
try:
|
||||
condition, params = _build_due_condition()
|
||||
query = 'SELECT COUNT(*) FROM proxylist WHERE ' + condition
|
||||
result = db.execute(query, params).fetchone()
|
||||
total_due = result[0] if result else 0
|
||||
return max(0, total_due - claimed_count)
|
||||
except Exception:
|
||||
return 0
|
||||
|
||||
|
||||
def calculate_fair_batch_size(db, worker_id):
|
||||
"""Calculate fair batch size based on active workers and queue size.
|
||||
|
||||
Divides due work evenly among active workers. No artificial floor —
|
||||
if only 6 proxies are due with 3 workers, each gets 2.
|
||||
"""
|
||||
active_workers = max(1, get_active_worker_count())
|
||||
due_count = get_due_proxy_count(db)
|
||||
|
||||
if due_count == 0:
|
||||
return 0
|
||||
|
||||
# Fair share: divide due work evenly among active workers
|
||||
fair_share = max(1, int(due_count / active_workers))
|
||||
|
||||
# Clamp to upper bound only
|
||||
batch_size = min(fair_share, _max_batch_size)
|
||||
|
||||
_log('fair_batch: due=%d workers=%d share=%d batch=%d' % (
|
||||
due_count, active_workers, fair_share, batch_size), 'debug')
|
||||
|
||||
return batch_size
|
||||
|
||||
|
||||
def load_workers():
|
||||
"""Load worker registry from disk."""
|
||||
global _workers, _worker_keys
|
||||
@@ -355,101 +300,6 @@ def get_worker_test_rate(worker_id):
|
||||
return 0.0
|
||||
return total_tests / elapsed
|
||||
|
||||
def claim_work(db, worker_id, count=100):
|
||||
"""Claim a batch of proxies for testing. Returns list of proxy dicts."""
|
||||
now = time.time()
|
||||
now_int = int(now)
|
||||
|
||||
# Calculate fair batch size based on active workers and queue size
|
||||
# Distributes work evenly: due_proxies / active_workers (with bounds)
|
||||
target_count = calculate_fair_batch_size(db, worker_id)
|
||||
|
||||
# Clean up stale claims and get current claimed set
|
||||
with _work_claims_lock:
|
||||
stale = [k for k, v in _work_claims.items() if now - v['claimed_at'] > _claim_timeout]
|
||||
for k in stale:
|
||||
del _work_claims[k]
|
||||
# Copy current claims to exclude from query
|
||||
claimed_keys = set(_work_claims.keys())
|
||||
|
||||
# Get proxies that need testing
|
||||
# Priority: untested first, then oldest due - with randomness within tiers
|
||||
try:
|
||||
# Build exclusion clause for already-claimed proxies
|
||||
# Use ip||':'||port to match our claim key format
|
||||
if claimed_keys:
|
||||
# SQLite placeholder limit is ~999, chunk if needed
|
||||
placeholders = ','.join('?' for _ in claimed_keys)
|
||||
exclude_clause = "AND (ip || ':' || port) NOT IN (%s)" % placeholders
|
||||
exclude_params = list(claimed_keys)
|
||||
else:
|
||||
exclude_clause = ""
|
||||
exclude_params = []
|
||||
|
||||
# Build due condition using new schedule formula
|
||||
due_condition, due_params = _build_due_condition()
|
||||
|
||||
# Priority tiers: 0=untested, 1=very overdue (>1hr), 2=recently due
|
||||
# Calculate overdue time based on new formula
|
||||
if _fail_retry_backoff:
|
||||
overdue_calc = '''
|
||||
CASE WHEN failed = 0
|
||||
THEN ? - (tested + ?)
|
||||
ELSE ? - (tested + (failed * ?))
|
||||
END
|
||||
'''
|
||||
priority_params = [now_int, _working_checktime, now_int, _fail_retry_interval]
|
||||
else:
|
||||
overdue_calc = '''
|
||||
CASE WHEN failed = 0
|
||||
THEN ? - (tested + ?)
|
||||
ELSE ? - (tested + ?)
|
||||
END
|
||||
'''
|
||||
priority_params = [now_int, _working_checktime, now_int, _fail_retry_interval]
|
||||
|
||||
query = '''
|
||||
SELECT ip, port, proto, failed, source_proto,
|
||||
CASE
|
||||
WHEN tested IS NULL THEN 0
|
||||
WHEN (%s) > 3600 THEN 1
|
||||
ELSE 2
|
||||
END as priority
|
||||
FROM proxylist
|
||||
WHERE %s
|
||||
%s
|
||||
ORDER BY priority, RANDOM()
|
||||
LIMIT ?
|
||||
''' % (overdue_calc, due_condition, exclude_clause)
|
||||
|
||||
params = priority_params + list(due_params) + exclude_params + [target_count]
|
||||
rows = db.execute(query, params).fetchall()
|
||||
except Exception as e:
|
||||
_log('claim_work query error: %s' % e, 'error')
|
||||
return []
|
||||
|
||||
# Claim the fetched proxies (already filtered by query)
|
||||
claimed = []
|
||||
with _work_claims_lock:
|
||||
for row in rows:
|
||||
proxy_key = '%s:%s' % (row[0], row[1])
|
||||
# Double-check not claimed (race condition protection)
|
||||
if proxy_key not in _work_claims:
|
||||
_work_claims[proxy_key] = {'worker_id': worker_id, 'claimed_at': now}
|
||||
claimed.append({
|
||||
'ip': row[0],
|
||||
'port': row[1],
|
||||
'proto': row[2],
|
||||
'failed': row[3],
|
||||
'source_proto': row[4],
|
||||
})
|
||||
|
||||
if claimed:
|
||||
_log('claim_work: %d proxies to %s (pool: %d claimed)' % (
|
||||
len(claimed), worker_id[:8], len(_work_claims)), 'info')
|
||||
|
||||
return claimed
|
||||
|
||||
def claim_urls(url_db, worker_id, count=5):
|
||||
"""Claim a batch of URLs for worker-driven fetching. Returns list of URL dicts.
|
||||
|
||||
@@ -795,126 +645,6 @@ def submit_proxy_reports(db, worker_id, proxies):
|
||||
return processed
|
||||
|
||||
|
||||
_last_workers_save = 0
|
||||
|
||||
def submit_results(db, worker_id, results):
|
||||
"""Process test results from a worker. Returns count of processed results."""
|
||||
global _last_workers_save
|
||||
processed = 0
|
||||
working_count = 0
|
||||
total_latency = 0
|
||||
now = time.time()
|
||||
|
||||
with _workers_lock:
|
||||
if worker_id in _workers:
|
||||
_workers[worker_id]['last_seen'] = now
|
||||
|
||||
for r in results:
|
||||
proxy_key = '%s:%s' % (r.get('ip', ''), r.get('port', ''))
|
||||
|
||||
# Release claim
|
||||
with _work_claims_lock:
|
||||
if proxy_key in _work_claims:
|
||||
del _work_claims[proxy_key]
|
||||
|
||||
# Update database - trust workers, add missing proxies if working
|
||||
try:
|
||||
working = 1 if r.get('working') else 0
|
||||
latency_ms = r.get('latency', 0) if working else None
|
||||
error_cat = r.get('error_category') if not working else None
|
||||
|
||||
if working:
|
||||
# Use INSERT OR REPLACE to add working proxies that don't exist
|
||||
db.execute('''
|
||||
INSERT INTO proxylist (proxy, ip, port, proto, failed, tested, avg_latency, added)
|
||||
VALUES (?, ?, ?, ?, 0, ?, ?, ?)
|
||||
ON CONFLICT(proxy) DO UPDATE SET
|
||||
failed = 0,
|
||||
tested = excluded.tested,
|
||||
avg_latency = excluded.avg_latency
|
||||
''', (proxy_key, r['ip'], r['port'], r.get('proto', 'http'), int(now),
|
||||
latency_ms, int(now)))
|
||||
working_count += 1
|
||||
total_latency += latency_ms or 0
|
||||
|
||||
# Geolocate working proxy if IP2Location available
|
||||
if _geolite and _geodb:
|
||||
try:
|
||||
rec = _geodb.get_all(r['ip'])
|
||||
if rec and rec.country_short and rec.country_short != '-':
|
||||
db.execute(
|
||||
'UPDATE proxylist SET country=? WHERE proxy=?',
|
||||
(rec.country_short, proxy_key))
|
||||
except Exception:
|
||||
pass # Geolocation is best-effort
|
||||
else:
|
||||
# For failures, only update if exists (don't add non-working proxies)
|
||||
db.execute('''
|
||||
UPDATE proxylist SET
|
||||
failed = failed + 1,
|
||||
tested = ?
|
||||
WHERE ip = ? AND port = ?
|
||||
''', (int(now), r['ip'], r['port']))
|
||||
|
||||
# Record result for verification system
|
||||
insert_proxy_result(db, proxy_key, worker_id, working,
|
||||
latency_ms=latency_ms, error_category=error_cat)
|
||||
|
||||
# Check for disagreement with other workers
|
||||
disagreement, other_worker, other_result = check_for_disagreement(
|
||||
db, proxy_key, worker_id, working)
|
||||
if disagreement:
|
||||
# Queue for manager verification (priority 3 = high)
|
||||
queue_verification(db, proxy_key, 'disagreement', priority=3,
|
||||
worker_a=worker_id, worker_b=other_worker,
|
||||
result_a=working, result_b=other_result)
|
||||
elif working:
|
||||
# Check for resurrection: was dead (failed >= 3), now working
|
||||
row = db.execute(
|
||||
'SELECT failed FROM proxylist WHERE proxy = ?', (proxy_key,)
|
||||
).fetchone()
|
||||
if row and row[0] >= 3:
|
||||
queue_verification(db, proxy_key, 'resurrection', priority=3,
|
||||
worker_a=worker_id, result_a=1)
|
||||
else:
|
||||
# Check for sudden death: was working (consecutive_success >= 3), now failed
|
||||
row = db.execute(
|
||||
'SELECT consecutive_success FROM proxylist WHERE proxy = ?', (proxy_key,)
|
||||
).fetchone()
|
||||
if row and row[0] and row[0] >= 3:
|
||||
queue_verification(db, proxy_key, 'sudden_death', priority=2,
|
||||
worker_a=worker_id, result_a=0)
|
||||
|
||||
processed += 1
|
||||
except Exception as e:
|
||||
_log('submit_results db error for %s: %s' % (proxy_key, e), 'error')
|
||||
|
||||
# Update worker stats
|
||||
with _workers_lock:
|
||||
if worker_id in _workers:
|
||||
w = _workers[worker_id]
|
||||
w['jobs_completed'] += 1
|
||||
w['proxies_tested'] += processed
|
||||
w['proxies_working'] = w.get('proxies_working', 0) + working_count
|
||||
w['proxies_failed'] = w.get('proxies_failed', 0) + (processed - working_count)
|
||||
w['total_latency'] = w.get('total_latency', 0) + total_latency
|
||||
w['last_batch_size'] = len(results)
|
||||
w['last_batch_working'] = working_count
|
||||
|
||||
# Commit database changes
|
||||
db.commit()
|
||||
|
||||
# Record for test rate calculation
|
||||
record_test_rate(worker_id, processed)
|
||||
|
||||
# Save workers periodically (every 60s)
|
||||
if now - _last_workers_save > 60:
|
||||
save_workers()
|
||||
_last_workers_save = now
|
||||
|
||||
return processed
|
||||
|
||||
|
||||
def is_localhost(ip):
|
||||
"""Check if IP is localhost (127.0.0.0/8 or ::1)."""
|
||||
if not ip:
|
||||
@@ -1605,7 +1335,7 @@ class ProxyAPIServer(threading.Thread):
|
||||
return [b'Method not allowed']
|
||||
|
||||
# POST only allowed for worker API endpoints
|
||||
post_endpoints = ('/api/register', '/api/results', '/api/heartbeat',
|
||||
post_endpoints = ('/api/register', '/api/heartbeat',
|
||||
'/api/report-urls', '/api/report-proxies')
|
||||
if method == 'POST' and path not in post_endpoints:
|
||||
start_response('405 Method Not Allowed', [('Content-Type', 'text/plain')])
|
||||
@@ -1673,8 +1403,6 @@ class ProxyAPIServer(threading.Thread):
|
||||
'/api/stats': 'runtime statistics (JSON)',
|
||||
'/api/mitm': 'MITM certificate statistics (JSON)',
|
||||
'/api/countries': 'proxy counts by country (JSON)',
|
||||
'/api/work': 'get work batch for worker (params: key, count)',
|
||||
'/api/results': 'submit test results (POST, params: key)',
|
||||
'/api/register': 'register as worker (POST)',
|
||||
'/api/workers': 'list connected workers',
|
||||
'/api/claim-urls': 'claim URL batch for worker-driven fetching (GET, params: key, count)',
|
||||
@@ -1941,54 +1669,6 @@ class ProxyAPIServer(threading.Thread):
|
||||
'message': 'registered successfully',
|
||||
}), 'application/json', 200
|
||||
|
||||
elif path == '/api/work':
|
||||
# Get batch of proxies to test (GET)
|
||||
key = query_params.get('key', '')
|
||||
if not validate_worker_key(key):
|
||||
return json.dumps({'error': 'invalid worker key'}), 'application/json', 403
|
||||
worker_id, _ = get_worker_by_key(key)
|
||||
if not worker_id:
|
||||
return json.dumps({'error': 'worker not found'}), 'application/json', 404
|
||||
count = int(query_params.get('count', 100))
|
||||
count = min(count, 500) # Cap at 500
|
||||
try:
|
||||
db = mysqlite.mysqlite(self.database, str)
|
||||
proxies = claim_work(db, worker_id, count)
|
||||
update_worker_heartbeat(worker_id)
|
||||
return json.dumps({
|
||||
'worker_id': worker_id,
|
||||
'count': len(proxies),
|
||||
'proxies': proxies,
|
||||
}), 'application/json', 200
|
||||
except Exception as e:
|
||||
return json.dumps({'error': str(e)}), 'application/json', 500
|
||||
|
||||
elif path == '/api/results':
|
||||
# Submit test results (POST)
|
||||
key = query_params.get('key', '')
|
||||
if not validate_worker_key(key):
|
||||
return json.dumps({'error': 'invalid worker key'}), 'application/json', 403
|
||||
worker_id, _ = get_worker_by_key(key)
|
||||
if not worker_id:
|
||||
return json.dumps({'error': 'worker not found'}), 'application/json', 404
|
||||
if not post_data:
|
||||
return json.dumps({'error': 'POST body required'}), 'application/json', 400
|
||||
results = post_data.get('results', [])
|
||||
if not results:
|
||||
return json.dumps({'error': 'no results provided'}), 'application/json', 400
|
||||
working = sum(1 for r in results if r.get('working'))
|
||||
_log('results: %d from %s (%d working)' % (len(results), worker_id[:8], working), 'info')
|
||||
try:
|
||||
db = mysqlite.mysqlite(self.database, str)
|
||||
processed = submit_results(db, worker_id, results)
|
||||
return json.dumps({
|
||||
'worker_id': worker_id,
|
||||
'processed': processed,
|
||||
'message': 'results submitted',
|
||||
}), 'application/json', 200
|
||||
except Exception as e:
|
||||
return json.dumps({'error': str(e)}), 'application/json', 500
|
||||
|
||||
elif path == '/api/heartbeat':
|
||||
# Worker heartbeat with Tor status (POST)
|
||||
key = query_params.get('key', '')
|
||||
@@ -2132,11 +1812,8 @@ class ProxyAPIServer(threading.Thread):
|
||||
'SELECT COUNT(*) FROM proxylist WHERE ' + due_condition,
|
||||
due_params).fetchone()
|
||||
due_total = row[0] if row else 0
|
||||
# Subtract currently claimed
|
||||
with _work_claims_lock:
|
||||
claimed_count = len(_work_claims)
|
||||
stats['due'] = max(0, due_total - claimed_count)
|
||||
stats['claimed'] = claimed_count
|
||||
stats['due'] = due_total
|
||||
stats['claimed'] = 0
|
||||
except Exception as e:
|
||||
_log('_get_db_stats error: %s' % e, 'warn')
|
||||
return stats
|
||||
@@ -2218,16 +1895,13 @@ class ProxyAPIServer(threading.Thread):
|
||||
if queue_stats['total'] > 0:
|
||||
pct = 100.0 * queue_stats['session_tested'] / queue_stats['total']
|
||||
queue_stats['session_pct'] = round(min(pct, 100.0), 1)
|
||||
# Claimed = currently being tested by workers
|
||||
with _work_claims_lock:
|
||||
queue_stats['claimed'] = len(_work_claims)
|
||||
queue_stats['claimed'] = 0
|
||||
# Due = ready for testing (respecting cooldown)
|
||||
due_condition, due_params = _build_due_condition()
|
||||
row = db.execute(
|
||||
'SELECT COUNT(*) FROM proxylist WHERE ' + due_condition,
|
||||
due_params).fetchone()
|
||||
due_total = row[0] if row else 0
|
||||
queue_stats['due'] = max(0, due_total - queue_stats['claimed'])
|
||||
queue_stats['due'] = row[0] if row else 0
|
||||
except Exception as e:
|
||||
_log('_get_workers_data queue stats error: %s' % e, 'warn')
|
||||
|
||||
|
||||
Reference in New Issue
Block a user