diff --git a/ppf.py b/ppf.py index e999436..dca207b 100644 --- a/ppf.py +++ b/ppf.py @@ -303,48 +303,6 @@ class NeedReregister(Exception): pass -def worker_get_work(server_url, worker_key, count=100): - """Fetch batch of proxies from master.""" - url = '%s/api/work?key=%s&count=%d' % (server_url.rstrip('/'), worker_key, count) - - try: - resp = urllib2.urlopen(url, timeout=30) - result = json.loads(resp.read()) - return result.get('proxies', []) - except urllib2.HTTPError as e: - if e.code == 403: - _log('worker key rejected (403), need to re-register', 'warn') - raise NeedReregister() - _log('failed to get work: %s' % e, 'error') - return [] - except Exception as e: - _log('failed to get work: %s' % e, 'error') - return [] - - -def worker_submit_results(server_url, worker_key, results): - """Submit test results to master.""" - url = '%s/api/results?key=%s' % (server_url.rstrip('/'), worker_key) - data = json.dumps({'results': results}) - - req = urllib2.Request(url, data) - req.add_header('Content-Type', 'application/json') - - try: - resp = urllib2.urlopen(req, timeout=30) - result = json.loads(resp.read()) - return result.get('processed', 0) - except urllib2.HTTPError as e: - if e.code == 403: - _log('worker key rejected (403), need to re-register', 'warn') - raise NeedReregister() - _log('failed to submit results: %s' % e, 'error') - return 0 - except Exception as e: - _log('failed to submit results: %s' % e, 'error') - return 0 - - def worker_send_heartbeat(server_url, worker_key, tor_ok, tor_ip=None, profiling=False, threads=0): """Send heartbeat with Tor status to master.""" url = '%s/api/heartbeat?key=%s' % (server_url.rstrip('/'), worker_key) @@ -470,309 +428,6 @@ def check_tor_connectivity(tor_hosts): return working, tor_ip -def worker_main(config): - """Worker mode main loop - uses proxywatchd multi-threaded testing.""" - import json - global urllib2 - - try: - import Queue - except ImportError: - import queue as Queue - - # Import proxywatchd for multi-threaded testing (gevent already patched at top) - import proxywatchd - proxywatchd.set_config(config) - - server_url = config.args.server - if not server_url: - _log('--server URL required for worker mode', 'error') - sys.exit(1) - - worker_key = config.args.worker_key - worker_name = config.args.worker_name or os.uname()[1] - batch_size = config.worker.batch_size - num_threads = config.watchd.threads - worker_id = None - - # Register if --register flag or no key provided - if config.args.register or not worker_key: - _log('registering with master: %s' % server_url, 'info') - worker_id, worker_key = worker_register(server_url, worker_name) - if not worker_key: - _log('registration failed, exiting', 'error') - sys.exit(1) - _log('registered as %s (id: %s)' % (worker_name, worker_id), 'info') - _log('worker key: %s' % worker_key, 'info') - _log('save this key with --worker-key for future runs', 'info') - - if config.args.register: - # Just register and exit - return - - _log('starting worker mode', 'info') - _log(' server: %s' % server_url, 'info') - _log(' threads: %d' % num_threads, 'info') - _log(' batch size: %d' % batch_size, 'info') - _log(' tor hosts: %s' % config.common.tor_hosts, 'info') - - # Verify Tor connectivity before claiming work - import socket - import socks - working_tor_hosts = [] - for tor_host in config.torhosts: - host, port = tor_host.split(':') - port = int(port) - try: - # Test SOCKS connection - test_sock = socks.socksocket() - test_sock.set_proxy(socks.SOCKS5, host, port) - test_sock.settimeout(10) - test_sock.connect(('check.torproject.org', 80)) - test_sock.send(b'GET / HTTP/1.0\r\nHost: check.torproject.org\r\n\r\n') - resp = test_sock.recv(512) - test_sock.close() - # Accept any HTTP response (200, 301, 302, etc.) - if resp and (b'HTTP/' in resp or len(resp) > 0): - status = resp.split(b'\r\n')[0] if b'\r\n' in resp else resp[:50] - _log('tor host %s:%d OK (%s)' % (host, port, status), 'info') - working_tor_hosts.append(tor_host) - else: - _log('tor host %s:%d no response (recv=%d bytes)' % (host, port, len(resp) if resp else 0), 'warn') - except Exception as e: - _log('tor host %s:%d failed: %s' % (host, port, e), 'warn') - - if not working_tor_hosts: - _log('no working Tor hosts, cannot start worker', 'error') - sys.exit(1) - - _log('%d/%d Tor hosts verified' % (len(working_tor_hosts), len(config.torhosts)), 'info') - - # Create shared queues for worker threads - job_queue = proxywatchd.PriorityJobQueue() - completion_queue = Queue.Queue() - - # Spawn worker threads with stagger to avoid overwhelming Tor - threads = [] - for i in range(num_threads): - wt = proxywatchd.WorkerThread('w%d' % i, job_queue) - wt.start_thread() - threads.append(wt) - time.sleep(random.random() / 10) # 0-100ms stagger per thread - - _log('spawned %d worker threads' % len(threads), 'info') - - jobs_completed = 0 - proxies_tested = 0 - start_time = time.time() - current_tor_ip = None - consecutive_tor_failures = 0 - worker_profiling = config.args.profile or config.common.profiling - # Use dict to allow mutation in nested function (Python 2 compatible) - wstate = {'worker_key': worker_key, 'worker_id': worker_id, 'backoff': 10} - - def do_register(): - """Register with master, with exponential backoff on failure.""" - while True: - _log('registering with master: %s' % server_url, 'info') - new_id, new_key = worker_register(server_url, worker_name) - if new_key: - wstate['worker_id'] = new_id - wstate['worker_key'] = new_key - wstate['backoff'] = 10 # Reset backoff on success - _log('registered as %s (id: %s)' % (worker_name, new_id), 'info') - return True - else: - _log('registration failed, retrying in %ds' % wstate['backoff'], 'warn') - time.sleep(wstate['backoff']) - wstate['backoff'] = min(wstate['backoff'] * 2, 300) # Max 5 min backoff - - def wait_for_tor(): - """Wait for Tor to become available, checking every 30 seconds.""" - check_interval = 30 - while True: - working, tor_ip = check_tor_connectivity(config.torhosts) - if working: - _log('tor recovered: %s (%s)' % (working[0], tor_ip or 'unknown'), 'info') - # Send heartbeat to manager - try: - worker_send_heartbeat(server_url, wstate['worker_key'], True, tor_ip, worker_profiling, num_threads) - except NeedReregister: - do_register() - return working, tor_ip - _log('tor still down, retrying in %ds' % check_interval, 'warn') - # Send heartbeat with tor_ok=False - try: - worker_send_heartbeat(server_url, wstate['worker_key'], False, None, worker_profiling, num_threads) - except NeedReregister: - do_register() - time.sleep(check_interval) - - try: - while True: - # Tor check before claiming work - don't claim if Tor is down - working, tor_ip = check_tor_connectivity(config.torhosts) - if not working: - consecutive_tor_failures += 1 - _log('tor down before claiming work (consecutive: %d)' % consecutive_tor_failures, 'warn') - try: - worker_send_heartbeat(server_url, wstate['worker_key'], False, None, worker_profiling, num_threads) - except NeedReregister: - do_register() - if consecutive_tor_failures >= 2: - _log('tor appears down, waiting before claiming work', 'error') - working, current_tor_ip = wait_for_tor() - consecutive_tor_failures = 0 - else: - time.sleep(10) - continue - else: - consecutive_tor_failures = 0 - if tor_ip != current_tor_ip: - if current_tor_ip: - _log('tor circuit rotated: %s' % tor_ip, 'info') - current_tor_ip = tor_ip - # Send heartbeat to manager - try: - worker_send_heartbeat(server_url, wstate['worker_key'], True, tor_ip, worker_profiling, num_threads) - except NeedReregister: - do_register() - - # Get work from master - try: - proxies = worker_get_work(server_url, wstate['worker_key'], batch_size) - except NeedReregister: - do_register() - continue - - if not proxies: - _log('no work available, sleeping 30s', 'info') - time.sleep(30) - continue - - _log('received %d proxies to test' % len(proxies), 'info') - - # Create ProxyTestState and jobs for each proxy - pending_states = {} - all_jobs = [] - - # Get checktype(s) from config - checktypes = config.watchd.checktypes - - for proxy_info in proxies: - ip = proxy_info['ip'] - port = proxy_info['port'] - proto = proxy_info.get('proto', 'http') - failed = proxy_info.get('failed', 0) - source_proto = proxy_info.get('source_proto') - proxy_str = '%s:%d' % (ip, port) - - # Create state for this proxy - state = proxywatchd.ProxyTestState( - ip, port, proto, failed, - success_count=0, total_duration=0.0, - country=None, mitm=0, consecutive_success=0, - asn=None, oldies=False, - completion_queue=completion_queue, - proxy_full=proxy_str, source_proto=source_proto - ) - pending_states[proxy_str] = state - - # Select random checktype - checktype = random.choice(checktypes) - - # Get target for this checktype - if checktype == 'judges': - available = proxywatchd.judge_stats.get_available_judges( - list(proxywatchd.judges.keys())) - target = random.choice(available) if available else random.choice( - list(proxywatchd.judges.keys())) - elif checktype == 'ssl': - target = random.choice(proxywatchd.ssl_targets) - elif checktype == 'irc': - target = random.choice(config.servers) if config.servers else 'irc.libera.chat:6667' - else: # head - target = random.choice(list(proxywatchd.regexes.keys())) - - job = proxywatchd.TargetTestJob(state, target, checktype) - all_jobs.append(job) - - # Shuffle and queue jobs - random.shuffle(all_jobs) - for job in all_jobs: - job_queue.put(job, priority=0) - - # Wait for all jobs to complete - completed = 0 - results = [] - timeout_start = time.time() - timeout_seconds = config.watchd.timeout * 2 + 30 # generous timeout - - while completed < len(proxies): - try: - state = completion_queue.get(timeout=1) - completed += 1 - - # Build result from state (failcount == 0 means success) - is_working = state.failcount == 0 - latency_sec = (state.last_latency_ms / 1000.0) if state.last_latency_ms else 0 - result = { - 'ip': state.ip, - 'port': state.port, - 'proto': state.proto, - 'working': is_working, - 'latency': round(latency_sec, 3) if is_working else 0, - 'error': None if is_working else 'failed', - } - results.append(result) - - # Progress logging - if completed % 20 == 0 or completed == len(proxies): - working = sum(1 for r in results if r.get('working')) - _log('tested %d/%d proxies (%d working)' % ( - completed, len(proxies), working), 'info') - - except Queue.Empty: - if time.time() - timeout_start > timeout_seconds: - _log('batch timeout, %d/%d completed' % (completed, len(proxies)), 'warn') - break - continue - - # Submit results - try: - processed = worker_submit_results(server_url, wstate['worker_key'], results) - except NeedReregister: - do_register() - # Retry submission with new key - try: - processed = worker_submit_results(server_url, wstate['worker_key'], results) - except NeedReregister: - _log('still rejected after re-register, discarding batch', 'error') - processed = 0 - - jobs_completed += 1 - proxies_tested += len(results) - - working = sum(1 for r in results if r.get('working')) - _log('batch %d: %d/%d working, submitted %d' % ( - jobs_completed, working, len(results), processed), 'info') - - # Brief pause between batches - time.sleep(1) - - except KeyboardInterrupt: - elapsed = time.time() - start_time - _log('worker stopping...', 'info') - # Stop threads - for wt in threads: - wt.stop() - for wt in threads: - wt.term() - _log('worker stopped after %s' % format_duration(int(elapsed)), 'info') - _log(' jobs completed: %d' % jobs_completed, 'info') - _log(' proxies tested: %d' % proxies_tested, 'info') - - def worker_v2_main(config): """V2 worker mode -- URL-driven discovery. @@ -1193,16 +848,11 @@ def main(): else: sys.exit(1) - # V2 worker mode: URL-driven discovery - if config.args.worker_v2: + # Worker mode: URL-driven discovery + if config.args.worker_v2 or config.args.register: worker_v2_main(config) return - # V1 worker mode: connect to master server instead of running locally - if config.args.worker or config.args.register: - worker_main(config) - return - proxydb = mysqlite.mysqlite(config.watchd.database, str) dbs.create_table_if_not_exists(proxydb, 'proxylist') fetch.init_known_proxies(proxydb)