From 862eeed5c86675fddc6a445bdf30e074204e4320 Mon Sep 17 00:00:00 2001 From: Username Date: Tue, 17 Feb 2026 14:23:58 +0100 Subject: [PATCH] ppf: add worker_v2_main() for URL-driven discovery --- ppf.py | 412 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 411 insertions(+), 1 deletion(-) diff --git a/ppf.py b/ppf.py index f2b4663..a65b03b 100644 --- a/ppf.py +++ b/ppf.py @@ -772,6 +772,411 @@ def worker_main(config): _log(' proxies tested: %d' % proxies_tested, 'info') +def worker_v2_main(config): + """V2 worker mode -- URL-driven discovery. + + Claims URLs from master, fetches through Tor, extracts and tests proxies, + reports working proxies back to master. + """ + import json + global urllib2 + + try: + import Queue + except ImportError: + import queue as Queue + + import proxywatchd + proxywatchd.set_config(config) + + server_url = config.args.server + if not server_url: + _log('--server URL required for worker mode', 'error') + sys.exit(1) + + worker_key = config.args.worker_key + worker_name = config.args.worker_name or os.uname()[1] + num_threads = config.watchd.threads + url_batch_size = config.worker.url_batch_size + + # Register if --register flag or no key provided + if config.args.register or not worker_key: + _log('registering with master: %s' % server_url, 'info') + worker_id, worker_key = worker_register(server_url, worker_name) + if not worker_key: + _log('registration failed, exiting', 'error') + sys.exit(1) + _log('registered as %s (id: %s)' % (worker_name, worker_id), 'info') + _log('worker key: %s' % worker_key, 'info') + _log('save this key with --worker-key for future runs', 'info') + + if config.args.register: + return + + _log('starting worker V2 mode (URL-driven)', 'info') + _log(' server: %s' % server_url, 'info') + _log(' threads: %d' % num_threads, 'info') + _log(' url batch: %d' % url_batch_size, 'info') + _log(' tor hosts: %s' % config.common.tor_hosts, 'info') + + # Verify Tor connectivity before starting + import socks + working_tor_hosts = [] + for tor_host in config.torhosts: + host, port = tor_host.split(':') + port = int(port) + try: + test_sock = socks.socksocket() + test_sock.set_proxy(socks.SOCKS5, host, port) + test_sock.settimeout(10) + test_sock.connect(('check.torproject.org', 80)) + test_sock.send(b'GET / HTTP/1.0\r\nHost: check.torproject.org\r\n\r\n') + resp = test_sock.recv(512) + test_sock.close() + if resp and (b'HTTP/' in resp or len(resp) > 0): + status = resp.split(b'\r\n')[0] if b'\r\n' in resp else resp[:50] + _log('tor host %s:%d OK (%s)' % (host, port, status), 'info') + working_tor_hosts.append(tor_host) + else: + _log('tor host %s:%d no response' % (host, port), 'warn') + except Exception as e: + _log('tor host %s:%d failed: %s' % (host, port, e), 'warn') + + if not working_tor_hosts: + _log('no working Tor hosts, cannot start worker', 'error') + sys.exit(1) + + _log('%d/%d Tor hosts verified' % (len(working_tor_hosts), len(config.torhosts)), 'info') + + # Create shared queues for worker threads + job_queue = proxywatchd.PriorityJobQueue() + completion_queue = Queue.Queue() + + # Spawn worker threads + threads = [] + for i in range(num_threads): + wt = proxywatchd.WorkerThread('w%d' % i, job_queue) + wt.start_thread() + threads.append(wt) + time.sleep(random.random() / 10) + + _log('spawned %d worker threads' % len(threads), 'info') + + # Session for fetching URLs through Tor + session = fetch.FetchSession() + + cycles = 0 + urls_fetched = 0 + proxies_found = 0 + proxies_working = 0 + start_time = time.time() + current_tor_ip = None + consecutive_tor_failures = 0 + worker_profiling = config.args.profile or config.common.profiling + wstate = {'worker_key': worker_key, 'worker_id': worker_id, 'backoff': 10} + + def do_register(): + """Register with master, with exponential backoff on failure.""" + while True: + _log('registering with master: %s' % server_url, 'info') + new_id, new_key = worker_register(server_url, worker_name) + if new_key: + wstate['worker_id'] = new_id + wstate['worker_key'] = new_key + wstate['backoff'] = 10 + _log('registered as %s (id: %s)' % (worker_name, new_id), 'info') + return True + else: + _log('registration failed, retrying in %ds' % wstate['backoff'], 'warn') + time.sleep(wstate['backoff']) + wstate['backoff'] = min(wstate['backoff'] * 2, 300) + + def wait_for_tor(): + """Wait for Tor to become available, checking every 30 seconds.""" + check_interval = 30 + while True: + working, tor_ip = check_tor_connectivity(config.torhosts) + if working: + _log('tor recovered: %s (%s)' % (working[0], tor_ip or 'unknown'), 'info') + try: + worker_send_heartbeat(server_url, wstate['worker_key'], True, tor_ip, worker_profiling, num_threads) + except NeedReregister: + do_register() + return working, tor_ip + _log('tor still down, retrying in %ds' % check_interval, 'warn') + try: + worker_send_heartbeat(server_url, wstate['worker_key'], False, None, worker_profiling, num_threads) + except NeedReregister: + do_register() + time.sleep(check_interval) + + try: + while True: + # Tor connectivity check + working, tor_ip = check_tor_connectivity(config.torhosts) + if not working: + consecutive_tor_failures += 1 + _log('tor down before claiming URLs (consecutive: %d)' % consecutive_tor_failures, 'warn') + try: + worker_send_heartbeat(server_url, wstate['worker_key'], False, None, worker_profiling, num_threads) + except NeedReregister: + do_register() + if consecutive_tor_failures >= 2: + _log('tor appears down, waiting before claiming URLs', 'error') + working, current_tor_ip = wait_for_tor() + consecutive_tor_failures = 0 + else: + time.sleep(10) + continue + else: + consecutive_tor_failures = 0 + if tor_ip != current_tor_ip: + if current_tor_ip: + _log('tor circuit rotated: %s' % tor_ip, 'info') + current_tor_ip = tor_ip + try: + worker_send_heartbeat(server_url, wstate['worker_key'], True, tor_ip, worker_profiling, num_threads) + except NeedReregister: + do_register() + + # Claim URLs from master + try: + url_infos = worker_claim_urls(server_url, wstate['worker_key'], url_batch_size) + except NeedReregister: + do_register() + continue + + if not url_infos: + _log('no URLs available, sleeping 30s', 'info') + time.sleep(30) + continue + + _log('claimed %d URLs to process' % len(url_infos), 'info') + + # Phase 1: Fetch URLs and extract proxies + url_reports = [] + all_extracted = [] # list of (addr, proto, confidence, source_url) + + for url_info in url_infos: + url = url_info.get('url', '') + last_hash = url_info.get('last_hash') + proto_hint = url_info.get('proto_hint') + + fetch_start = time.time() + try: + content = session.fetch(url) + except Exception as e: + _log('%s: fetch error: %s' % (url.split('/')[2] if '/' in url else url, e), 'error') + content = None + + fetch_time_ms = int((time.time() - fetch_start) * 1000) + urls_fetched += 1 + + if not content: + url_reports.append({ + 'url': url, + 'success': False, + 'content_hash': None, + 'proxy_count': 0, + 'fetch_time_ms': fetch_time_ms, + 'changed': False, + 'error': 'fetch failed', + }) + continue + + # Detect protocol from URL path + proto = fetch.detect_proto_from_path(url) or proto_hint + + # Extract proxies (no filter_known -- workers have no proxydb) + extracted = fetch.extract_proxies(content, filter_known=False, proto=proto) + + # Compute hash of extracted proxy list + content_hash = dbs.compute_proxy_list_hash(extracted) + + if content_hash and last_hash and content_hash == last_hash: + # Content unchanged + url_reports.append({ + 'url': url, + 'success': True, + 'content_hash': content_hash, + 'proxy_count': len(extracted), + 'fetch_time_ms': fetch_time_ms, + 'changed': False, + 'error': None, + }) + host = url.split('/')[2] if '/' in url else url + _log('%s: unchanged (%d proxies, hash match)' % (host, len(extracted)), 'stale') + continue + + # Content changed or first fetch + for addr, pr, conf in extracted: + all_extracted.append((addr, pr, conf, url)) + + url_reports.append({ + 'url': url, + 'success': True, + 'content_hash': content_hash, + 'proxy_count': len(extracted), + 'fetch_time_ms': fetch_time_ms, + 'changed': True, + 'error': None, + }) + + host = url.split('/')[2] if '/' in url else url + _log('%s: %d proxies extracted' % (host, len(extracted)), 'info') + + # Report URL health to master + if url_reports: + try: + worker_report_urls(server_url, wstate['worker_key'], url_reports) + except NeedReregister: + do_register() + try: + worker_report_urls(server_url, wstate['worker_key'], url_reports) + except NeedReregister: + _log('still rejected after re-register, discarding url reports', 'error') + + # Deduplicate extracted proxies by address + seen = set() + unique_proxies = [] + source_map = {} # addr -> first source_url + for addr, pr, conf, source_url in all_extracted: + if addr not in seen: + seen.add(addr) + unique_proxies.append((addr, pr, conf)) + source_map[addr] = source_url + + proxies_found += len(unique_proxies) + + if not unique_proxies: + cycles += 1 + time.sleep(1) + continue + + _log('testing %d unique proxies' % len(unique_proxies), 'info') + + # Phase 2: Test extracted proxies using worker thread pool + pending_states = {} + all_jobs = [] + checktypes = config.watchd.checktypes + + for addr, pr, conf in unique_proxies: + # Parse ip:port from addr (may contain auth: user:pass@ip:port) + addr_part = addr.split('@')[-1] if '@' in addr else addr + + # Handle IPv6 [ipv6]:port + if addr_part.startswith('['): + bracket_end = addr_part.index(']') + ip = addr_part[1:bracket_end] + port = int(addr_part[bracket_end+2:]) + else: + ip, port_str = addr_part.rsplit(':', 1) + port = int(port_str) + + proto = pr or 'http' + proxy_str = '%s:%d' % (ip, port) + + state = proxywatchd.ProxyTestState( + ip, port, proto, 0, + success_count=0, total_duration=0.0, + country=None, mitm=0, consecutive_success=0, + asn=None, oldies=False, + completion_queue=completion_queue, + proxy_full=addr, source_proto=pr + ) + pending_states[proxy_str] = state + + checktype = random.choice(checktypes) + + if checktype == 'judges': + available = proxywatchd.judge_stats.get_available_judges( + list(proxywatchd.judges.keys())) + target = random.choice(available) if available else random.choice( + list(proxywatchd.judges.keys())) + elif checktype == 'ssl': + target = random.choice(proxywatchd.ssl_targets) + elif checktype == 'irc': + target = random.choice(config.servers) if config.servers else 'irc.libera.chat:6667' + else: # head + target = random.choice(list(proxywatchd.regexes.keys())) + + job = proxywatchd.TargetTestJob(state, target, checktype) + all_jobs.append(job) + + random.shuffle(all_jobs) + for job in all_jobs: + job_queue.put(job, priority=0) + + # Wait for completion + completed = 0 + timeout_start = time.time() + timeout_seconds = max(config.watchd.timeout * 2 + 30, len(all_jobs) * 0.5) + working_results = [] + + while completed < len(all_jobs): + try: + state = completion_queue.get(timeout=1) + completed += 1 + + if state.failcount == 0: + latency_sec = (state.last_latency_ms / 1000.0) if state.last_latency_ms else 0 + proxy_addr = state.proxy + if state.auth: + proxy_addr = '%s@%s' % (state.auth, state.proxy) + + working_results.append({ + 'ip': state.ip, + 'port': state.port, + 'proto': state.proto, + 'source_proto': state.source_proto, + 'latency': round(latency_sec, 3), + 'exit_ip': state.exit_ip, + 'source_url': source_map.get(proxy_addr) or source_map.get(state.proxy, ''), + }) + + if completed % 50 == 0 or completed == len(all_jobs): + _log('tested %d/%d proxies (%d working)' % ( + completed, len(all_jobs), len(working_results)), 'info') + + except Queue.Empty: + if time.time() - timeout_start > timeout_seconds: + _log('test timeout, %d/%d completed' % (completed, len(all_jobs)), 'warn') + break + continue + + proxies_working += len(working_results) + + # Report working proxies to master + if working_results: + try: + processed = worker_report_proxies(server_url, wstate['worker_key'], working_results) + except NeedReregister: + do_register() + try: + processed = worker_report_proxies(server_url, wstate['worker_key'], working_results) + except NeedReregister: + _log('still rejected after re-register, discarding proxy reports', 'error') + processed = 0 + _log('reported %d working proxies (submitted %d)' % (len(working_results), processed), 'info') + + cycles += 1 + time.sleep(1) + + except KeyboardInterrupt: + elapsed = time.time() - start_time + _log('worker V2 stopping...', 'info') + session.close() + for wt in threads: + wt.stop() + for wt in threads: + wt.term() + _log('worker V2 stopped after %s' % format_duration(int(elapsed)), 'info') + _log(' cycles: %d' % cycles, 'info') + _log(' urls fetched: %d' % urls_fetched, 'info') + _log(' proxies found: %d' % proxies_found, 'info') + _log(' proxies working: %d' % proxies_working, 'info') + + def main(): """Main entry point.""" global config @@ -784,7 +1189,12 @@ def main(): else: sys.exit(1) - # Worker mode: connect to master server instead of running locally + # V2 worker mode: URL-driven discovery + if config.args.worker_v2: + worker_v2_main(config) + return + + # V1 worker mode: connect to master server instead of running locally if config.args.worker or config.args.register: worker_main(config) return