diff --git a/ppf.py b/ppf.py index 98d1f1c..83d2d9f 100644 --- a/ppf.py +++ b/ppf.py @@ -1,20 +1,26 @@ #!/usr/bin/env python2 +import sys +import os + +# Worker mode requires gevent - must monkey-patch before other imports +if '--worker' in sys.argv or '--register' in sys.argv: + from gevent import monkey + monkey.patch_all() + import cProfile import pstats import signal import dbs import time import mysqlite -import proxywatchd from misc import _log from config import Config import fetch -import sys from soup_parser import set_nobs import threading import random -import os +import json # Global profiler for signal handler access _profiler = None @@ -168,8 +174,6 @@ class Leechered(threading.Thread): def retrieve(self): return self.url, self.proxylist, self.stale_count, self.error, self.retrievals, self.content_type, self.proxies_added, self.execute - def status(self): - return self.status def run(self): self.status = 'nok' @@ -267,6 +271,442 @@ class Leechered(threading.Thread): self.status = 'nok' +# Worker mode imports (lazy loaded) +try: + import urllib2 + import socket +except ImportError: + urllib2 = None + + +def worker_register(server_url, name, master_key=''): + """Register with master server and get credentials.""" + url = server_url.rstrip('/') + '/api/register' + data = json.dumps({'name': name, 'master_key': master_key}) + + req = urllib2.Request(url, data) + req.add_header('Content-Type', 'application/json') + + try: + resp = urllib2.urlopen(req, timeout=30) + result = json.loads(resp.read()) + return result.get('worker_id'), result.get('worker_key') + except Exception as e: + _log('registration failed: %s' % e, 'error') + return None, None + + +class NeedReregister(Exception): + """Raised when worker key is invalid and re-registration is needed.""" + pass + + +def worker_get_work(server_url, worker_key, count=100): + """Fetch batch of proxies from master.""" + url = '%s/api/work?key=%s&count=%d' % (server_url.rstrip('/'), worker_key, count) + + try: + resp = urllib2.urlopen(url, timeout=30) + result = json.loads(resp.read()) + return result.get('proxies', []) + except urllib2.HTTPError as e: + if e.code == 403: + _log('worker key rejected (403), need to re-register', 'warn') + raise NeedReregister() + _log('failed to get work: %s' % e, 'error') + return [] + except Exception as e: + _log('failed to get work: %s' % e, 'error') + return [] + + +def worker_submit_results(server_url, worker_key, results): + """Submit test results to master.""" + url = '%s/api/results?key=%s' % (server_url.rstrip('/'), worker_key) + data = json.dumps({'results': results}) + + req = urllib2.Request(url, data) + req.add_header('Content-Type', 'application/json') + + try: + resp = urllib2.urlopen(req, timeout=30) + result = json.loads(resp.read()) + return result.get('processed', 0) + except urllib2.HTTPError as e: + if e.code == 403: + _log('worker key rejected (403), need to re-register', 'warn') + raise NeedReregister() + _log('failed to submit results: %s' % e, 'error') + return 0 + except Exception as e: + _log('failed to submit results: %s' % e, 'error') + return 0 + + +def worker_send_heartbeat(server_url, worker_key, tor_ok, tor_ip=None, profiling=False): + """Send heartbeat with Tor status to master.""" + url = '%s/api/heartbeat?key=%s' % (server_url.rstrip('/'), worker_key) + data = json.dumps({ + 'tor_ok': tor_ok, + 'tor_ip': tor_ip, + 'profiling': profiling, + }) + + req = urllib2.Request(url, data) + req.add_header('Content-Type', 'application/json') + + try: + resp = urllib2.urlopen(req, timeout=10) + return True + except urllib2.HTTPError as e: + if e.code == 403: + raise NeedReregister() + return False + except Exception: + return False + + +def check_tor_connectivity(tor_hosts): + """Test Tor connectivity. Returns (working_hosts, tor_ip).""" + import socket + import socks + + working = [] + tor_ip = None + + for tor_host in tor_hosts: + host, port = tor_host.split(':') + port = int(port) + try: + test_sock = socks.socksocket() + test_sock.set_proxy(socks.SOCKS5, host, port) + test_sock.settimeout(15) + test_sock.connect(('check.torproject.org', 80)) + test_sock.send(b'GET /api/ip HTTP/1.0\r\nHost: check.torproject.org\r\n\r\n') + resp = test_sock.recv(1024) + test_sock.close() + + if resp and b'HTTP/' in resp: + working.append(tor_host) + # Extract IP from JSON response body + if b'\r\n\r\n' in resp: + body = resp.split(b'\r\n\r\n', 1)[1] + try: + data = json.loads(body) + tor_ip = data.get('IP') + except Exception: + pass + except Exception: + pass + + return working, tor_ip + + +def worker_main(config): + """Worker mode main loop - uses proxywatchd multi-threaded testing.""" + import json + global urllib2 + + try: + import Queue + except ImportError: + import queue as Queue + + # Import proxywatchd for multi-threaded testing (gevent already patched at top) + import proxywatchd + proxywatchd.set_config(config) + + server_url = config.args.server + if not server_url: + _log('--server URL required for worker mode', 'error') + sys.exit(1) + + worker_key = config.args.worker_key + worker_name = config.args.worker_name or os.uname()[1] + batch_size = config.worker.batch_size + num_threads = config.watchd.threads + + # Register if --register flag or no key provided + if config.args.register or not worker_key: + _log('registering with master: %s' % server_url, 'info') + worker_id, worker_key = worker_register(server_url, worker_name) + if not worker_key: + _log('registration failed, exiting', 'error') + sys.exit(1) + _log('registered as %s (id: %s)' % (worker_name, worker_id), 'info') + _log('worker key: %s' % worker_key, 'info') + _log('save this key with --worker-key for future runs', 'info') + + if config.args.register: + # Just register and exit + return + + _log('starting worker mode', 'info') + _log(' server: %s' % server_url, 'info') + _log(' threads: %d' % num_threads, 'info') + _log(' batch size: %d' % batch_size, 'info') + _log(' tor hosts: %s' % config.common.tor_hosts, 'info') + + # Verify Tor connectivity before claiming work + import socket + import socks + working_tor_hosts = [] + for tor_host in config.torhosts: + host, port = tor_host.split(':') + port = int(port) + try: + # Test SOCKS connection + test_sock = socks.socksocket() + test_sock.set_proxy(socks.SOCKS5, host, port) + test_sock.settimeout(10) + test_sock.connect(('check.torproject.org', 80)) + test_sock.send(b'GET / HTTP/1.0\r\nHost: check.torproject.org\r\n\r\n') + resp = test_sock.recv(512) + test_sock.close() + # Accept any HTTP response (200, 301, 302, etc.) + if resp and (b'HTTP/' in resp or len(resp) > 0): + status = resp.split(b'\r\n')[0] if b'\r\n' in resp else resp[:50] + _log('tor host %s:%d OK (%s)' % (host, port, status), 'info') + working_tor_hosts.append(tor_host) + else: + _log('tor host %s:%d no response (recv=%d bytes)' % (host, port, len(resp) if resp else 0), 'warn') + except Exception as e: + _log('tor host %s:%d failed: %s' % (host, port, e), 'warn') + + if not working_tor_hosts: + _log('no working Tor hosts, cannot start worker', 'error') + sys.exit(1) + + _log('%d/%d Tor hosts verified' % (len(working_tor_hosts), len(config.torhosts)), 'info') + + # Create shared queues for worker threads + job_queue = proxywatchd.PriorityJobQueue() + result_queue = Queue.Queue() + completion_queue = Queue.Queue() + + # Spawn worker threads with stagger to avoid overwhelming Tor + threads = [] + for i in range(num_threads): + wt = proxywatchd.WorkerThread('w%d' % i, job_queue, result_queue) + wt.start_thread() + threads.append(wt) + time.sleep(random.random() / 10) # 0-100ms stagger per thread + + _log('spawned %d worker threads' % len(threads), 'info') + + jobs_completed = 0 + proxies_tested = 0 + start_time = time.time() + last_tor_check = time.time() + tor_check_interval = 300 # Check Tor every 5 minutes + current_tor_ip = None + consecutive_tor_failures = 0 + worker_profiling = config.args.profile or config.common.profiling + # Use dict to allow mutation in nested function (Python 2 compatible) + wstate = {'worker_key': worker_key, 'worker_id': worker_id, 'backoff': 10} + + def do_register(): + """Register with master, with exponential backoff on failure.""" + while True: + _log('registering with master: %s' % server_url, 'info') + new_id, new_key = worker_register(server_url, worker_name) + if new_key: + wstate['worker_id'] = new_id + wstate['worker_key'] = new_key + wstate['backoff'] = 10 # Reset backoff on success + _log('registered as %s (id: %s)' % (worker_name, new_id), 'info') + return True + else: + _log('registration failed, retrying in %ds' % wstate['backoff'], 'warn') + time.sleep(wstate['backoff']) + wstate['backoff'] = min(wstate['backoff'] * 2, 300) # Max 5 min backoff + + def wait_for_tor(): + """Wait for Tor to become available, with exponential backoff.""" + backoff = 10 + while True: + working, tor_ip = check_tor_connectivity(config.torhosts) + if working: + _log('tor recovered: %s (%s)' % (working[0], tor_ip or 'unknown'), 'info') + # Send heartbeat to manager + try: + worker_send_heartbeat(server_url, wstate['worker_key'], True, tor_ip, worker_profiling) + except NeedReregister: + do_register() + return working, tor_ip + _log('tor still down, retrying in %ds' % backoff, 'warn') + # Send heartbeat with tor_ok=False + try: + worker_send_heartbeat(server_url, wstate['worker_key'], False, None, worker_profiling) + except NeedReregister: + do_register() + time.sleep(backoff) + backoff = min(backoff * 2, 300) # Max 5 min backoff + + try: + while True: + # Periodic Tor health check + now = time.time() + if now - last_tor_check > tor_check_interval: + working, tor_ip = check_tor_connectivity(config.torhosts) + last_tor_check = now + if working: + consecutive_tor_failures = 0 + if tor_ip != current_tor_ip: + _log('tor circuit rotated: %s' % tor_ip, 'info') + current_tor_ip = tor_ip + # Send periodic heartbeat + try: + worker_send_heartbeat(server_url, wstate['worker_key'], True, tor_ip, worker_profiling) + except NeedReregister: + do_register() + else: + consecutive_tor_failures += 1 + _log('tor connectivity failed (consecutive: %d)' % consecutive_tor_failures, 'warn') + if consecutive_tor_failures >= 2: + _log('tor appears down, pausing work', 'error') + try: + worker_send_heartbeat(server_url, wstate['worker_key'], False, None, worker_profiling) + except NeedReregister: + do_register() + working, current_tor_ip = wait_for_tor() + consecutive_tor_failures = 0 + last_tor_check = time.time() + + # Get work from master + try: + proxies = worker_get_work(server_url, wstate['worker_key'], batch_size) + except NeedReregister: + do_register() + continue + + if not proxies: + _log('no work available, sleeping 30s', 'info') + time.sleep(30) + continue + + _log('received %d proxies to test' % len(proxies), 'info') + + # Create ProxyTestState and jobs for each proxy + pending_states = {} + all_jobs = [] + + # Get checktype(s) from config + checktypes = config.watchd.checktypes + + for proxy_info in proxies: + ip = proxy_info['ip'] + port = proxy_info['port'] + proto = proxy_info.get('proto', 'http') + failed = proxy_info.get('failed', 0) + proxy_str = '%s:%d' % (ip, port) + + # Create state for this proxy + state = proxywatchd.ProxyTestState( + ip, port, proto, failed, + success_count=0, total_duration=0.0, + country=None, mitm=0, consecutive_success=0, + asn=None, num_targets=1, oldies=False, + completion_queue=completion_queue, + proxy_full=proxy_str + ) + pending_states[proxy_str] = state + + # Select random checktype + checktype = random.choice(checktypes) + + # Get target for this checktype + if checktype == 'judges': + available = proxywatchd.judge_stats.get_available_judges( + list(proxywatchd.judges.keys())) + target = random.choice(available) if available else random.choice( + list(proxywatchd.judges.keys())) + elif checktype == 'ssl': + target = random.choice(proxywatchd.ssl_targets) + elif checktype == 'irc': + target = random.choice(config.servers) if config.servers else 'irc.libera.chat:6667' + else: # head + target = random.choice(list(proxywatchd.regexes.keys())) + + job = proxywatchd.TargetTestJob(state, target, checktype) + all_jobs.append(job) + + # Shuffle and queue jobs + random.shuffle(all_jobs) + for job in all_jobs: + job_queue.put(job, priority=0) + + # Wait for all jobs to complete + completed = 0 + results = [] + timeout_start = time.time() + timeout_seconds = config.watchd.timeout * 2 + 30 # generous timeout + + while completed < len(proxies): + try: + state = completion_queue.get(timeout=1) + completed += 1 + + # Build result from state (failcount == 0 means success) + is_working = state.failcount == 0 + latency_sec = (state.last_latency_ms / 1000.0) if state.last_latency_ms else 0 + result = { + 'ip': state.ip, + 'port': state.port, + 'proto': state.proto, + 'working': is_working, + 'latency': round(latency_sec, 3) if is_working else 0, + 'error': None if is_working else 'failed', + } + results.append(result) + + # Progress logging + if completed % 20 == 0 or completed == len(proxies): + working = sum(1 for r in results if r.get('working')) + _log('tested %d/%d proxies (%d working)' % ( + completed, len(proxies), working), 'info') + + except Queue.Empty: + if time.time() - timeout_start > timeout_seconds: + _log('batch timeout, %d/%d completed' % (completed, len(proxies)), 'warn') + break + continue + + # Submit results + try: + processed = worker_submit_results(server_url, wstate['worker_key'], results) + except NeedReregister: + do_register() + # Retry submission with new key + try: + processed = worker_submit_results(server_url, wstate['worker_key'], results) + except NeedReregister: + _log('still rejected after re-register, discarding batch', 'error') + processed = 0 + + jobs_completed += 1 + proxies_tested += len(results) + + working = sum(1 for r in results if r.get('working')) + _log('batch %d: %d/%d working, submitted %d' % ( + jobs_completed, working, len(results), processed), 'info') + + # Brief pause between batches + time.sleep(1) + + except KeyboardInterrupt: + elapsed = time.time() - start_time + _log('worker stopping...', 'info') + # Stop threads + for wt in threads: + wt.stop() + for wt in threads: + wt.term() + _log('worker stopped after %s' % format_duration(int(elapsed)), 'info') + _log(' jobs completed: %d' % jobs_completed, 'info') + _log(' proxies tested: %d' % proxies_tested, 'info') + + def main(): """Main entry point.""" global config @@ -279,6 +719,11 @@ def main(): else: sys.exit(1) + # Worker mode: connect to master server instead of running locally + if config.args.worker or config.args.register: + worker_main(config) + return + proxydb = mysqlite.mysqlite(config.watchd.database, str) dbs.create_table_if_not_exists(proxydb, 'proxylist') fetch.init_known_proxies(proxydb) @@ -293,12 +738,25 @@ def main(): if len(sys.argv) == 3 and sys.argv[1] == "--file": sys.exit(import_proxies_from_file(proxydb, sys.argv[2])) - # start proxy watcher + # start proxy watcher (import here to avoid gevent dependency in worker mode) + import proxywatchd + httpd_server = None if config.watchd.threads > 0: watcherd = proxywatchd.Proxywatchd() watcherd.start() else: watcherd = None + # Start httpd independently when watchd is disabled + if config.httpd.enabled: + from httpd import ProxyAPIServer + profiling = config.args.profile or config.common.profiling + httpd_server = ProxyAPIServer( + config.httpd.listenip, + config.httpd.port, + config.watchd.database, + profiling=profiling, + ) + httpd_server.start() # start scraper threads if enabled scrapers = [] @@ -310,11 +768,27 @@ def main(): scrapers.append(s) _log('started %d scraper thread(s)' % len(scrapers), 'info') - qurl = 'SELECT url,stale_count,error,retrievals,proxies_added,content_type,content_hash FROM uris WHERE error < ? and (check_time+?+((error+stale_count)*?) ? OR proxies_added > 0) + ORDER BY RANDOM()''' + # Query to count skipped old URLs (for logging) + qurl_skipped = '''SELECT COUNT(*) FROM uris + WHERE error < ? + AND (check_time+?+((error+stale_count)*?) < ?) + AND added <= ? + AND proxies_added = 0''' threads = [] rows = [] reqtime = time.time() - 3600 statusmsg = time.time() + list_max_age_seconds = config.ppf.list_max_age_days * 86400 + last_skip_log = 0 while True: try: time.sleep(random.random()/10) @@ -323,15 +797,23 @@ def main(): statusmsg = time.time() if not rows: if (time.time() - reqtime) > 3: - rows = urldb.execute(qurl, (config.ppf.max_fail, config.ppf.checktime, config.ppf.perfail_checktime, int(time.time()))).fetchall() + now = int(time.time()) + min_added = now - list_max_age_seconds + rows = urldb.execute(qurl, (config.ppf.max_fail, config.ppf.checktime, config.ppf.perfail_checktime, now, min_added)).fetchall() reqtime = time.time() + # Log skipped old URLs periodically (every 10 minutes) + if (time.time() - last_skip_log) > 600: + skipped = urldb.execute(qurl_skipped, (config.ppf.max_fail, config.ppf.checktime, config.ppf.perfail_checktime, now, min_added)).fetchone() + if skipped and skipped[0] > 0: + _log('skipping %d old proxy lists (added >%d days ago, no proxies found)' % (skipped[0], config.ppf.list_max_age_days), 'stale') + last_skip_log = time.time() if len(rows) < config.ppf.threads: time.sleep(60) rows = [] else: _log('handing %d job(s) to %d thread(s)' % ( len(rows), config.ppf.threads ), 'ppf') - _proxylist = [ '%s://%s' % (p[0], p[1]) for p in proxydb.execute('SELECT proto,proxy from proxylist where failed=0').fetchall() ] + _proxylist = [ '%s://%s' % (p[0], p[1]) for p in proxydb.execute("SELECT proto,proxy from proxylist where failed=0 AND tested IS NOT NULL AND proto IN ('http','socks4','socks5')").fetchall() ] if not _proxylist: _proxylist = None for thread in threads: @@ -366,6 +848,8 @@ def main(): if watcherd: watcherd.stop() watcherd.finish() + if httpd_server: + httpd_server.stop() break _log('ppf stopped', 'info')