diff --git a/httpd.py b/httpd.py index 5a01bb1..dc2780c 100644 --- a/httpd.py +++ b/httpd.py @@ -73,11 +73,8 @@ import string _workers = {} # worker_id -> {name, ip, last_seen, jobs_completed, proxies_tested, ...} _workers_lock = threading.Lock() -_work_claims = {} # proxy_key -> {worker_id, claimed_at} -_work_claims_lock = threading.Lock() _worker_keys = set() # valid API keys _master_key = None # master key for worker registration -_claim_timeout = 300 # seconds before unclaimed work is released _workers_file = 'data/workers.json' # persistent storage # URL claim tracking (parallel to proxy claims) @@ -101,11 +98,6 @@ _worker_test_history = {} _worker_test_history_lock = threading.Lock() _test_history_window = 120 # seconds to keep test history for rate calculation -# Fair distribution settings -_min_batch_size = 1 # minimum proxies per batch -_max_batch_size = 10000 # maximum proxies per batch -_worker_timeout = 120 # seconds before worker considered inactive - # Session tracking _session_start_time = int(time.time()) # when httpd started @@ -171,53 +163,6 @@ def _build_due_condition(): return condition, params -def get_active_worker_count(): - """Count workers seen within timeout window.""" - now = time.time() - with _workers_lock: - return sum(1 for w in _workers.values() - if (now - w.get('last_seen', 0)) < _worker_timeout) - - -def get_due_proxy_count(db): - """Count proxies due for testing (not claimed).""" - with _work_claims_lock: - claimed_count = len(_work_claims) - - try: - condition, params = _build_due_condition() - query = 'SELECT COUNT(*) FROM proxylist WHERE ' + condition - result = db.execute(query, params).fetchone() - total_due = result[0] if result else 0 - return max(0, total_due - claimed_count) - except Exception: - return 0 - - -def calculate_fair_batch_size(db, worker_id): - """Calculate fair batch size based on active workers and queue size. - - Divides due work evenly among active workers. No artificial floor — - if only 6 proxies are due with 3 workers, each gets 2. - """ - active_workers = max(1, get_active_worker_count()) - due_count = get_due_proxy_count(db) - - if due_count == 0: - return 0 - - # Fair share: divide due work evenly among active workers - fair_share = max(1, int(due_count / active_workers)) - - # Clamp to upper bound only - batch_size = min(fair_share, _max_batch_size) - - _log('fair_batch: due=%d workers=%d share=%d batch=%d' % ( - due_count, active_workers, fair_share, batch_size), 'debug') - - return batch_size - - def load_workers(): """Load worker registry from disk.""" global _workers, _worker_keys @@ -355,101 +300,6 @@ def get_worker_test_rate(worker_id): return 0.0 return total_tests / elapsed -def claim_work(db, worker_id, count=100): - """Claim a batch of proxies for testing. Returns list of proxy dicts.""" - now = time.time() - now_int = int(now) - - # Calculate fair batch size based on active workers and queue size - # Distributes work evenly: due_proxies / active_workers (with bounds) - target_count = calculate_fair_batch_size(db, worker_id) - - # Clean up stale claims and get current claimed set - with _work_claims_lock: - stale = [k for k, v in _work_claims.items() if now - v['claimed_at'] > _claim_timeout] - for k in stale: - del _work_claims[k] - # Copy current claims to exclude from query - claimed_keys = set(_work_claims.keys()) - - # Get proxies that need testing - # Priority: untested first, then oldest due - with randomness within tiers - try: - # Build exclusion clause for already-claimed proxies - # Use ip||':'||port to match our claim key format - if claimed_keys: - # SQLite placeholder limit is ~999, chunk if needed - placeholders = ','.join('?' for _ in claimed_keys) - exclude_clause = "AND (ip || ':' || port) NOT IN (%s)" % placeholders - exclude_params = list(claimed_keys) - else: - exclude_clause = "" - exclude_params = [] - - # Build due condition using new schedule formula - due_condition, due_params = _build_due_condition() - - # Priority tiers: 0=untested, 1=very overdue (>1hr), 2=recently due - # Calculate overdue time based on new formula - if _fail_retry_backoff: - overdue_calc = ''' - CASE WHEN failed = 0 - THEN ? - (tested + ?) - ELSE ? - (tested + (failed * ?)) - END - ''' - priority_params = [now_int, _working_checktime, now_int, _fail_retry_interval] - else: - overdue_calc = ''' - CASE WHEN failed = 0 - THEN ? - (tested + ?) - ELSE ? - (tested + ?) - END - ''' - priority_params = [now_int, _working_checktime, now_int, _fail_retry_interval] - - query = ''' - SELECT ip, port, proto, failed, source_proto, - CASE - WHEN tested IS NULL THEN 0 - WHEN (%s) > 3600 THEN 1 - ELSE 2 - END as priority - FROM proxylist - WHERE %s - %s - ORDER BY priority, RANDOM() - LIMIT ? - ''' % (overdue_calc, due_condition, exclude_clause) - - params = priority_params + list(due_params) + exclude_params + [target_count] - rows = db.execute(query, params).fetchall() - except Exception as e: - _log('claim_work query error: %s' % e, 'error') - return [] - - # Claim the fetched proxies (already filtered by query) - claimed = [] - with _work_claims_lock: - for row in rows: - proxy_key = '%s:%s' % (row[0], row[1]) - # Double-check not claimed (race condition protection) - if proxy_key not in _work_claims: - _work_claims[proxy_key] = {'worker_id': worker_id, 'claimed_at': now} - claimed.append({ - 'ip': row[0], - 'port': row[1], - 'proto': row[2], - 'failed': row[3], - 'source_proto': row[4], - }) - - if claimed: - _log('claim_work: %d proxies to %s (pool: %d claimed)' % ( - len(claimed), worker_id[:8], len(_work_claims)), 'info') - - return claimed - def claim_urls(url_db, worker_id, count=5): """Claim a batch of URLs for worker-driven fetching. Returns list of URL dicts. @@ -795,126 +645,6 @@ def submit_proxy_reports(db, worker_id, proxies): return processed -_last_workers_save = 0 - -def submit_results(db, worker_id, results): - """Process test results from a worker. Returns count of processed results.""" - global _last_workers_save - processed = 0 - working_count = 0 - total_latency = 0 - now = time.time() - - with _workers_lock: - if worker_id in _workers: - _workers[worker_id]['last_seen'] = now - - for r in results: - proxy_key = '%s:%s' % (r.get('ip', ''), r.get('port', '')) - - # Release claim - with _work_claims_lock: - if proxy_key in _work_claims: - del _work_claims[proxy_key] - - # Update database - trust workers, add missing proxies if working - try: - working = 1 if r.get('working') else 0 - latency_ms = r.get('latency', 0) if working else None - error_cat = r.get('error_category') if not working else None - - if working: - # Use INSERT OR REPLACE to add working proxies that don't exist - db.execute(''' - INSERT INTO proxylist (proxy, ip, port, proto, failed, tested, avg_latency, added) - VALUES (?, ?, ?, ?, 0, ?, ?, ?) - ON CONFLICT(proxy) DO UPDATE SET - failed = 0, - tested = excluded.tested, - avg_latency = excluded.avg_latency - ''', (proxy_key, r['ip'], r['port'], r.get('proto', 'http'), int(now), - latency_ms, int(now))) - working_count += 1 - total_latency += latency_ms or 0 - - # Geolocate working proxy if IP2Location available - if _geolite and _geodb: - try: - rec = _geodb.get_all(r['ip']) - if rec and rec.country_short and rec.country_short != '-': - db.execute( - 'UPDATE proxylist SET country=? WHERE proxy=?', - (rec.country_short, proxy_key)) - except Exception: - pass # Geolocation is best-effort - else: - # For failures, only update if exists (don't add non-working proxies) - db.execute(''' - UPDATE proxylist SET - failed = failed + 1, - tested = ? - WHERE ip = ? AND port = ? - ''', (int(now), r['ip'], r['port'])) - - # Record result for verification system - insert_proxy_result(db, proxy_key, worker_id, working, - latency_ms=latency_ms, error_category=error_cat) - - # Check for disagreement with other workers - disagreement, other_worker, other_result = check_for_disagreement( - db, proxy_key, worker_id, working) - if disagreement: - # Queue for manager verification (priority 3 = high) - queue_verification(db, proxy_key, 'disagreement', priority=3, - worker_a=worker_id, worker_b=other_worker, - result_a=working, result_b=other_result) - elif working: - # Check for resurrection: was dead (failed >= 3), now working - row = db.execute( - 'SELECT failed FROM proxylist WHERE proxy = ?', (proxy_key,) - ).fetchone() - if row and row[0] >= 3: - queue_verification(db, proxy_key, 'resurrection', priority=3, - worker_a=worker_id, result_a=1) - else: - # Check for sudden death: was working (consecutive_success >= 3), now failed - row = db.execute( - 'SELECT consecutive_success FROM proxylist WHERE proxy = ?', (proxy_key,) - ).fetchone() - if row and row[0] and row[0] >= 3: - queue_verification(db, proxy_key, 'sudden_death', priority=2, - worker_a=worker_id, result_a=0) - - processed += 1 - except Exception as e: - _log('submit_results db error for %s: %s' % (proxy_key, e), 'error') - - # Update worker stats - with _workers_lock: - if worker_id in _workers: - w = _workers[worker_id] - w['jobs_completed'] += 1 - w['proxies_tested'] += processed - w['proxies_working'] = w.get('proxies_working', 0) + working_count - w['proxies_failed'] = w.get('proxies_failed', 0) + (processed - working_count) - w['total_latency'] = w.get('total_latency', 0) + total_latency - w['last_batch_size'] = len(results) - w['last_batch_working'] = working_count - - # Commit database changes - db.commit() - - # Record for test rate calculation - record_test_rate(worker_id, processed) - - # Save workers periodically (every 60s) - if now - _last_workers_save > 60: - save_workers() - _last_workers_save = now - - return processed - - def is_localhost(ip): """Check if IP is localhost (127.0.0.0/8 or ::1).""" if not ip: @@ -1605,7 +1335,7 @@ class ProxyAPIServer(threading.Thread): return [b'Method not allowed'] # POST only allowed for worker API endpoints - post_endpoints = ('/api/register', '/api/results', '/api/heartbeat', + post_endpoints = ('/api/register', '/api/heartbeat', '/api/report-urls', '/api/report-proxies') if method == 'POST' and path not in post_endpoints: start_response('405 Method Not Allowed', [('Content-Type', 'text/plain')]) @@ -1673,8 +1403,6 @@ class ProxyAPIServer(threading.Thread): '/api/stats': 'runtime statistics (JSON)', '/api/mitm': 'MITM certificate statistics (JSON)', '/api/countries': 'proxy counts by country (JSON)', - '/api/work': 'get work batch for worker (params: key, count)', - '/api/results': 'submit test results (POST, params: key)', '/api/register': 'register as worker (POST)', '/api/workers': 'list connected workers', '/api/claim-urls': 'claim URL batch for worker-driven fetching (GET, params: key, count)', @@ -1941,54 +1669,6 @@ class ProxyAPIServer(threading.Thread): 'message': 'registered successfully', }), 'application/json', 200 - elif path == '/api/work': - # Get batch of proxies to test (GET) - key = query_params.get('key', '') - if not validate_worker_key(key): - return json.dumps({'error': 'invalid worker key'}), 'application/json', 403 - worker_id, _ = get_worker_by_key(key) - if not worker_id: - return json.dumps({'error': 'worker not found'}), 'application/json', 404 - count = int(query_params.get('count', 100)) - count = min(count, 500) # Cap at 500 - try: - db = mysqlite.mysqlite(self.database, str) - proxies = claim_work(db, worker_id, count) - update_worker_heartbeat(worker_id) - return json.dumps({ - 'worker_id': worker_id, - 'count': len(proxies), - 'proxies': proxies, - }), 'application/json', 200 - except Exception as e: - return json.dumps({'error': str(e)}), 'application/json', 500 - - elif path == '/api/results': - # Submit test results (POST) - key = query_params.get('key', '') - if not validate_worker_key(key): - return json.dumps({'error': 'invalid worker key'}), 'application/json', 403 - worker_id, _ = get_worker_by_key(key) - if not worker_id: - return json.dumps({'error': 'worker not found'}), 'application/json', 404 - if not post_data: - return json.dumps({'error': 'POST body required'}), 'application/json', 400 - results = post_data.get('results', []) - if not results: - return json.dumps({'error': 'no results provided'}), 'application/json', 400 - working = sum(1 for r in results if r.get('working')) - _log('results: %d from %s (%d working)' % (len(results), worker_id[:8], working), 'info') - try: - db = mysqlite.mysqlite(self.database, str) - processed = submit_results(db, worker_id, results) - return json.dumps({ - 'worker_id': worker_id, - 'processed': processed, - 'message': 'results submitted', - }), 'application/json', 200 - except Exception as e: - return json.dumps({'error': str(e)}), 'application/json', 500 - elif path == '/api/heartbeat': # Worker heartbeat with Tor status (POST) key = query_params.get('key', '') @@ -2132,11 +1812,8 @@ class ProxyAPIServer(threading.Thread): 'SELECT COUNT(*) FROM proxylist WHERE ' + due_condition, due_params).fetchone() due_total = row[0] if row else 0 - # Subtract currently claimed - with _work_claims_lock: - claimed_count = len(_work_claims) - stats['due'] = max(0, due_total - claimed_count) - stats['claimed'] = claimed_count + stats['due'] = due_total + stats['claimed'] = 0 except Exception as e: _log('_get_db_stats error: %s' % e, 'warn') return stats @@ -2218,16 +1895,13 @@ class ProxyAPIServer(threading.Thread): if queue_stats['total'] > 0: pct = 100.0 * queue_stats['session_tested'] / queue_stats['total'] queue_stats['session_pct'] = round(min(pct, 100.0), 1) - # Claimed = currently being tested by workers - with _work_claims_lock: - queue_stats['claimed'] = len(_work_claims) + queue_stats['claimed'] = 0 # Due = ready for testing (respecting cooldown) due_condition, due_params = _build_due_condition() row = db.execute( 'SELECT COUNT(*) FROM proxylist WHERE ' + due_condition, due_params).fetchone() - due_total = row[0] if row else 0 - queue_stats['due'] = max(0, due_total - queue_stats['claimed']) + queue_stats['due'] = row[0] if row else 0 except Exception as e: _log('_get_workers_data queue stats error: %s' % e, 'warn')