diff --git a/httpd.py b/httpd.py index 6683183..f5d83f9 100644 --- a/httpd.py +++ b/httpd.py @@ -3,6 +3,11 @@ from __future__ import division """HTTP API server with advanced web dashboard for PPF.""" +try: + from ppf import __version__ +except ImportError: + __version__ = 'unknown' + import BaseHTTPServer import json import threading @@ -17,8 +22,8 @@ from misc import _log # Rate limiting configuration _rate_limits = defaultdict(list) _rate_lock = threading.Lock() -_rate_limit_requests = 60 # requests per window -_rate_limit_window = 60 # window in seconds +_rate_limit_requests = 300 # requests per window (increased for workers) +_rate_limit_window = 60 # window in seconds # Static directories (relative to this file) _STATIC_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static') @@ -67,6 +72,266 @@ _gc_objects_ttl = 30 # seconds _db_health_cache = {'value': {}, 'time': 0} _db_health_ttl = 10 # seconds +# Worker registry for distributed testing +import hashlib +import random +import string + +_workers = {} # worker_id -> {name, ip, last_seen, jobs_completed, proxies_tested, ...} +_workers_lock = threading.Lock() +_work_claims = {} # proxy_key -> {worker_id, claimed_at} +_work_claims_lock = threading.Lock() +_worker_keys = set() # valid API keys +_master_key = None # master key for worker registration +_claim_timeout = 300 # seconds before unclaimed work is released +_workers_file = 'data/workers.json' # persistent storage + +def load_workers(): + """Load worker registry from disk.""" + global _workers, _worker_keys + try: + if os.path.exists(_workers_file): + with open(_workers_file, 'r') as f: + data = json.load(f) + _workers = data.get('workers', {}) + _worker_keys = set(data.get('keys', [])) + _log('loaded %d workers from %s' % (len(_workers), _workers_file), 'info') + except Exception as e: + _log('failed to load workers: %s' % e, 'warn') + +def save_workers(): + """Save worker registry to disk.""" + try: + data = { + 'workers': _workers, + 'keys': list(_worker_keys), + 'saved': time.time(), + } + with open(_workers_file, 'w') as f: + json.dump(data, f, indent=2) + except Exception as e: + _log('failed to save workers: %s' % e, 'warn') + +def generate_worker_key(): + """Generate a random worker API key.""" + chars = string.ascii_letters + string.digits + return ''.join(random.choice(chars) for _ in range(32)) + +def generate_worker_id(name, ip): + """Generate a unique worker ID from name and IP.""" + data = '%s:%s:%s' % (name, ip, time.time()) + return hashlib.sha256(data.encode()).hexdigest()[:16] + +def register_worker(name, ip, key=None): + """Register a new worker and return its credentials. + + If a worker with same name exists, update its IP and reset key. + This allows workers to re-register after restarts. + """ + # Check for existing worker with same name + with _workers_lock: + for wid, info in _workers.items(): + if info.get('name') == name: + # Re-registration: update IP, generate new key, preserve stats + new_key = key or generate_worker_key() + _worker_keys.discard(info.get('key')) # Remove old key + info['ip'] = ip + info['key'] = new_key + info['last_seen'] = time.time() + _worker_keys.add(new_key) + save_workers() + return wid, new_key + + # New registration + worker_id = generate_worker_id(name, ip) + worker_key = key or generate_worker_key() + with _workers_lock: + _workers[worker_id] = { + 'name': name, + 'ip': ip, + 'key': worker_key, + 'registered': time.time(), + 'last_seen': time.time(), + 'jobs_completed': 0, + 'proxies_tested': 0, + 'proxies_working': 0, + 'proxies_failed': 0, + 'total_latency': 0, + 'last_batch_size': 0, + 'last_batch_working': 0, + } + _worker_keys.add(worker_key) + save_workers() + return worker_id, worker_key + +def validate_worker_key(key): + """Check if worker key is valid.""" + if not key: + return False + # Master key allows all operations + if _master_key and key == _master_key: + return True + return key in _worker_keys + +def get_worker_by_key(key): + """Get worker info by API key.""" + with _workers_lock: + for wid, info in _workers.items(): + if info.get('key') == key: + return wid, info + return None, None + +def update_worker_heartbeat(worker_id): + """Update worker's last_seen timestamp.""" + with _workers_lock: + if worker_id in _workers: + _workers[worker_id]['last_seen'] = time.time() + +def claim_work(db, worker_id, count=100): + """Claim a batch of proxies for testing. Returns list of proxy dicts.""" + now = time.time() + now_int = int(now) + + # Randomize batch size (100-250) to stagger worker completion times + # This avoids thundering herd when all workers finish and request simultaneously + target_count = random.randint(100, 250) + + # Clean up stale claims and get current claimed set + with _work_claims_lock: + stale = [k for k, v in _work_claims.items() if now - v['claimed_at'] > _claim_timeout] + for k in stale: + del _work_claims[k] + # Copy current claims to exclude from query + claimed_keys = set(_work_claims.keys()) + + # Get proxies that need testing + # Respect cooldown: tested + checktime + (failed * perfail_checktime) < now + # Priority: untested first, then oldest due - with randomness within tiers + checktime = 1800 # 30 min base cooldown + perfail_checktime = 3600 # +1 hour per failure + max_fail = 5 + + try: + # Build exclusion clause for already-claimed proxies + # Use ip||':'||port to match our claim key format + if claimed_keys: + # SQLite placeholder limit is ~999, chunk if needed + placeholders = ','.join('?' for _ in claimed_keys) + exclude_clause = "AND (ip || ':' || port) NOT IN (%s)" % placeholders + exclude_params = list(claimed_keys) + else: + exclude_clause = "" + exclude_params = [] + + # Priority tiers: 0=untested, 1=due >1hr ago, 2=due <1hr ago + # Random within each tier for fair distribution + query = ''' + SELECT ip, port, proto, failed, + CASE + WHEN tested IS NULL THEN 0 + WHEN (? - (tested + ? + (failed * ?))) > 3600 THEN 1 + ELSE 2 + END as priority + FROM proxylist + WHERE failed >= 0 AND failed < ? + AND (tested IS NULL OR (tested + ? + (failed * ?)) < ?) + %s + ORDER BY priority, RANDOM() + LIMIT ? + ''' % exclude_clause + + params = [now_int, checktime, perfail_checktime, + max_fail, checktime, perfail_checktime, now_int] + exclude_params + [target_count] + rows = db.execute(query, params).fetchall() + except Exception as e: + _log('claim_work query error: %s' % e, 'error') + return [] + + # Claim the fetched proxies (already filtered by query) + claimed = [] + with _work_claims_lock: + for row in rows: + proxy_key = '%s:%s' % (row[0], row[1]) + # Double-check not claimed (race condition protection) + if proxy_key not in _work_claims: + _work_claims[proxy_key] = {'worker_id': worker_id, 'claimed_at': now} + claimed.append({ + 'ip': row[0], + 'port': row[1], + 'proto': row[2], + 'failed': row[3], + }) + + if claimed: + _log('claim_work: %d proxies to %s (pool: %d claimed)' % ( + len(claimed), worker_id[:8], len(_work_claims)), 'info') + + return claimed + +_last_workers_save = 0 + +def submit_results(db, worker_id, results): + """Process test results from a worker. Returns count of processed results.""" + global _last_workers_save + processed = 0 + working_count = 0 + total_latency = 0 + now = time.time() + + with _workers_lock: + if worker_id in _workers: + _workers[worker_id]['last_seen'] = now + + for r in results: + proxy_key = '%s:%s' % (r.get('ip', ''), r.get('port', '')) + + # Release claim + with _work_claims_lock: + if proxy_key in _work_claims: + del _work_claims[proxy_key] + + # Update database + try: + if r.get('working'): + db.execute(''' + UPDATE proxylist SET + failed = 0, + tested = ?, + latency = ? + WHERE ip = ? AND port = ? + ''', (int(now), r.get('latency', 0), r['ip'], r['port'])) + working_count += 1 + total_latency += r.get('latency', 0) + else: + db.execute(''' + UPDATE proxylist SET + failed = failed + 1, + tested = ? + WHERE ip = ? AND port = ? + ''', (int(now), r['ip'], r['port'])) + processed += 1 + except Exception: + pass + + # Update worker stats + with _workers_lock: + if worker_id in _workers: + w = _workers[worker_id] + w['jobs_completed'] += 1 + w['proxies_tested'] += processed + w['proxies_working'] = w.get('proxies_working', 0) + working_count + w['proxies_failed'] = w.get('proxies_failed', 0) + (processed - working_count) + w['total_latency'] = w.get('total_latency', 0) + total_latency + w['last_batch_size'] = len(results) + w['last_batch_working'] = working_count + + # Save workers periodically (every 60s) + if now - _last_workers_save > 60: + save_workers() + _last_workers_save = now + + return processed + def is_localhost(ip): """Check if IP is localhost (127.0.0.0/8 or ::1).""" @@ -333,6 +598,7 @@ def load_static_files(theme): 'dashboard.html': 'static/dashboard.html', 'map.html': 'static/map.html', 'mitm.html': 'static/mitm.html', + 'workers.html': 'static/workers.html', 'style.css': 'static/style.css', 'dashboard.js': 'static/dashboard.js', 'map.js': 'static/map.js', @@ -598,7 +864,7 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler): self.send_json({'error': str(e)}, 500) def handle_health(self): - self.send_json({'status': 'ok', 'timestamp': int(time.time())}) + self.send_json({'status': 'ok', 'version': __version__, 'timestamp': int(time.time())}) class ProxyAPIServer(threading.Thread): @@ -608,12 +874,13 @@ class ProxyAPIServer(threading.Thread): otherwise falls back to standard BaseHTTPServer. """ - def __init__(self, host, port, database, stats_provider=None): + def __init__(self, host, port, database, stats_provider=None, profiling=False): threading.Thread.__init__(self) self.host = host self.port = port self.database = database self.stats_provider = stats_provider + self.profiling = profiling self.daemon = True self.server = None self._stop_event = threading.Event() if not GEVENT_PATCHED else None @@ -621,17 +888,49 @@ class ProxyAPIServer(threading.Thread): load_static_libs() # Load dashboard static files (HTML, CSS, JS) with theme substitution load_static_files(THEME) + # Load worker registry from disk + load_workers() def _wsgi_app(self, environ, start_response): """WSGI application wrapper for gevent.""" path = environ.get('PATH_INFO', '/').split('?')[0] + query_string = environ.get('QUERY_STRING', '') method = environ.get('REQUEST_METHOD', 'GET') remote_addr = environ.get('REMOTE_ADDR', '') - if method != 'GET': + # Parse query parameters + query_params = {} + if query_string: + for param in query_string.split('&'): + if '=' in param: + k, v = param.split('=', 1) + query_params[k] = v + + # Only allow GET and POST + if method not in ('GET', 'POST'): start_response('405 Method Not Allowed', [('Content-Type', 'text/plain')]) return [b'Method not allowed'] + # POST only allowed for worker API endpoints + post_endpoints = ('/api/register', '/api/results') + if method == 'POST' and path not in post_endpoints: + start_response('405 Method Not Allowed', [('Content-Type', 'text/plain')]) + return [b'POST not allowed for this endpoint'] + + # Parse POST body + post_data = None + if method == 'POST': + try: + content_length = int(environ.get('CONTENT_LENGTH', 0)) + if content_length > 0: + body = environ['wsgi.input'].read(content_length) + post_data = json.loads(body) + except Exception: + error_body = json.dumps({'error': 'invalid JSON body'}) + headers = [('Content-Type', 'application/json')] + start_response('400 Bad Request', headers) + return [error_body.encode('utf-8')] + # Rate limiting check if not check_rate_limit(remote_addr): error_body = json.dumps({'error': 'rate limited', 'retry_after': _rate_limit_window}) @@ -646,7 +945,7 @@ class ProxyAPIServer(threading.Thread): # Route handling try: - response_body, content_type, status = self._handle_route(path, remote_addr) + response_body, content_type, status = self._handle_route(path, remote_addr, query_params, post_data) status_line = '%d %s' % (status, 'OK' if status == 200 else 'Error') headers = [ ('Content-Type', content_type), @@ -666,8 +965,10 @@ class ProxyAPIServer(threading.Thread): start_response('500 Internal Server Error', headers) return [error_body] - def _handle_route(self, path, remote_addr=''): + def _handle_route(self, path, remote_addr='', query_params=None, post_data=None): """Handle route and return (body, content_type, status).""" + if query_params is None: + query_params = {} if path == '/': body = json.dumps({ 'endpoints': { @@ -677,6 +978,10 @@ class ProxyAPIServer(threading.Thread): '/api/stats': 'runtime statistics (JSON)', '/api/mitm': 'MITM certificate statistics (JSON)', '/api/countries': 'proxy counts by country (JSON)', + '/api/work': 'get work batch for worker (params: key, count)', + '/api/results': 'submit test results (POST, params: key)', + '/api/register': 'register as worker (POST)', + '/api/workers': 'list connected workers', '/proxies': 'list working proxies (params: limit, proto, country, asn)', '/proxies/count': 'count working proxies', '/health': 'health check', @@ -698,6 +1003,11 @@ class ProxyAPIServer(threading.Thread): if content: return content, 'text/html; charset=utf-8', 200 return '{"error": "mitm.html not loaded"}', 'application/json', 500 + elif path == '/workers': + content = get_static_file('workers.html') + if content: + return content, 'text/html; charset=utf-8', 200 + return '{"error": "workers.html not loaded"}', 'application/json', 500 elif path == '/static/style.css': content = get_static_file('style.css') if content: @@ -740,6 +1050,9 @@ class ProxyAPIServer(threading.Thread): stats['db_health'] = get_db_health(db) except Exception: pass + # Add profiling flag (from constructor or stats_provider) + if 'profiling' not in stats: + stats['profiling'] = self.profiling return json.dumps(stats, indent=2), 'application/json', 200 elif path == '/api/mitm': # MITM certificate statistics @@ -864,7 +1177,182 @@ class ProxyAPIServer(threading.Thread): except Exception as e: return json.dumps({'error': str(e)}), 'application/json', 500 elif path == '/health': - return json.dumps({'status': 'ok', 'timestamp': int(time.time())}), 'application/json', 200 + return json.dumps({'status': 'ok', 'version': __version__, 'timestamp': int(time.time())}), 'application/json', 200 + + # Worker API endpoints + elif path == '/api/register': + # Register a new worker (POST) + if not post_data: + return json.dumps({'error': 'POST body required'}), 'application/json', 400 + name = post_data.get('name', 'worker-%s' % remote_addr) + master_key = post_data.get('master_key', '') + # Require master key for registration (if set) + if _master_key and master_key != _master_key: + return json.dumps({'error': 'invalid master key'}), 'application/json', 403 + worker_id, worker_key = register_worker(name, remote_addr) + _log('worker registered: %s (%s) from %s' % (name, worker_id, remote_addr), 'info') + return json.dumps({ + 'worker_id': worker_id, + 'worker_key': worker_key, + 'message': 'registered successfully', + }), 'application/json', 200 + + elif path == '/api/work': + # Get batch of proxies to test (GET) + key = query_params.get('key', '') + if not validate_worker_key(key): + return json.dumps({'error': 'invalid worker key'}), 'application/json', 403 + worker_id, _ = get_worker_by_key(key) + if not worker_id: + return json.dumps({'error': 'worker not found'}), 'application/json', 404 + count = int(query_params.get('count', 100)) + count = min(count, 500) # Cap at 500 + try: + db = mysqlite.mysqlite(self.database, str) + proxies = claim_work(db, worker_id, count) + update_worker_heartbeat(worker_id) + return json.dumps({ + 'worker_id': worker_id, + 'count': len(proxies), + 'proxies': proxies, + }), 'application/json', 200 + except Exception as e: + return json.dumps({'error': str(e)}), 'application/json', 500 + + elif path == '/api/results': + # Submit test results (POST) + key = query_params.get('key', '') + if not validate_worker_key(key): + return json.dumps({'error': 'invalid worker key'}), 'application/json', 403 + worker_id, _ = get_worker_by_key(key) + if not worker_id: + return json.dumps({'error': 'worker not found'}), 'application/json', 404 + if not post_data: + return json.dumps({'error': 'POST body required'}), 'application/json', 400 + results = post_data.get('results', []) + if not results: + return json.dumps({'error': 'no results provided'}), 'application/json', 400 + try: + db = mysqlite.mysqlite(self.database, str) + processed = submit_results(db, worker_id, results) + return json.dumps({ + 'worker_id': worker_id, + 'processed': processed, + 'message': 'results submitted', + }), 'application/json', 200 + except Exception as e: + return json.dumps({'error': str(e)}), 'application/json', 500 + + elif path == '/api/heartbeat': + # Worker heartbeat with Tor status (POST) + key = query_params.get('key', '') + if not validate_worker_key(key): + return json.dumps({'error': 'invalid worker key'}), 'application/json', 403 + worker_id, _ = get_worker_by_key(key) + if not worker_id: + return json.dumps({'error': 'worker not found'}), 'application/json', 404 + # Update worker Tor and profiling status + tor_ok = post_data.get('tor_ok', True) if post_data else True + tor_ip = post_data.get('tor_ip') if post_data else None + profiling = post_data.get('profiling', False) if post_data else False + now = time.time() + with _workers_lock: + if worker_id in _workers: + _workers[worker_id]['last_seen'] = now + _workers[worker_id]['tor_ok'] = tor_ok + _workers[worker_id]['tor_ip'] = tor_ip + _workers[worker_id]['tor_last_check'] = now + _workers[worker_id]['profiling'] = profiling + return json.dumps({ + 'worker_id': worker_id, + 'message': 'heartbeat received', + }), 'application/json', 200 + + elif path == '/api/workers': + # List connected workers + now = time.time() + with _workers_lock: + workers = [] + total_tested = 0 + total_working = 0 + total_failed = 0 + for wid, info in _workers.items(): + tested = info.get('proxies_tested', 0) + working = info.get('proxies_working', 0) + failed = info.get('proxies_failed', 0) + total_latency = info.get('total_latency', 0) + avg_latency = round(total_latency / working, 2) if working > 0 else 0 + success_rate = round(100 * working / tested, 1) if tested > 0 else 0 + total_tested += tested + total_working += working + total_failed += failed + # Tor status with age check + tor_ok = info.get('tor_ok', True) + tor_ip = info.get('tor_ip') + tor_last_check = info.get('tor_last_check', 0) + tor_age = int(now - tor_last_check) if tor_last_check else None + worker_profiling = info.get('profiling', False) + workers.append({ + 'id': wid, + 'name': info['name'], + 'ip': info['ip'], + 'registered': int(info['registered']), + 'last_seen': int(info['last_seen']), + 'age': int(now - info['last_seen']), + 'jobs_completed': info.get('jobs_completed', 0), + 'proxies_tested': tested, + 'proxies_working': working, + 'proxies_failed': failed, + 'success_rate': success_rate, + 'avg_latency': avg_latency, + 'last_batch_size': info.get('last_batch_size', 0), + 'last_batch_working': info.get('last_batch_working', 0), + 'active': (now - info['last_seen']) < 120, + 'tor_ok': tor_ok, + 'tor_ip': tor_ip, + 'tor_age': tor_age, + 'profiling': worker_profiling, + }) + # Sort by name for consistent display + workers.sort(key=lambda w: w['name']) + # Get queue status from database + queue_stats = {'pending': 0, 'claimed': 0, 'due': 0} + try: + db = mysqlite.mysqlite(self.database, str) + # Pending = total eligible (not dead) + row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed >= 0 AND failed < 5').fetchone() + queue_stats['pending'] = row[0] if row else 0 + # Claimed = currently being tested by workers + with _work_claims_lock: + queue_stats['claimed'] = len(_work_claims) + # Due = ready for testing (respecting cooldown) + now_int = int(time.time()) + checktime = 1800 # 30 min base + perfail_checktime = 3600 # +1 hour per failure + max_fail = 5 + row = db.execute(''' + SELECT COUNT(*) FROM proxylist + WHERE failed >= 0 AND failed < ? + AND (tested IS NULL OR (tested + ? + (failed * ?)) < ?) + ''', (max_fail, checktime, perfail_checktime, now_int)).fetchone() + due_total = row[0] if row else 0 + queue_stats['due'] = max(0, due_total - queue_stats['claimed']) + db.close() + except Exception: + pass + return json.dumps({ + 'workers': workers, + 'total': len(workers), + 'active': sum(1 for w in workers if w['active']), + 'summary': { + 'total_tested': total_tested, + 'total_working': total_working, + 'total_failed': total_failed, + 'overall_success_rate': round(100 * total_working / total_tested, 1) if total_tested > 0 else 0, + }, + 'queue': queue_stats, + }, indent=2), 'application/json', 200 + else: return json.dumps({'error': 'not found'}), 'application/json', 404 @@ -890,6 +1378,23 @@ class ProxyAPIServer(threading.Thread): stats['working'] = row[0] if row else 0 row = db.execute('SELECT COUNT(*) FROM proxylist').fetchone() stats['total'] = row[0] if row else 0 + + # Proxies due for testing (eligible for worker claims) + now_int = int(time.time()) + checktime = 1800 # 30 min base cooldown + perfail_checktime = 3600 # +1 hour per failure + max_fail = 5 + row = db.execute(''' + SELECT COUNT(*) FROM proxylist + WHERE failed >= 0 AND failed < ? + AND (tested IS NULL OR (tested + ? + (failed * ?)) < ?) + ''', (max_fail, checktime, perfail_checktime, now_int)).fetchone() + due_total = row[0] if row else 0 + # Subtract currently claimed + with _work_claims_lock: + claimed_count = len(_work_claims) + stats['due'] = max(0, due_total - claimed_count) + stats['claimed'] = claimed_count except Exception: pass return stats