diff --git a/httpd.py b/httpd.py index 86787f2..420f111 100644 --- a/httpd.py +++ b/httpd.py @@ -86,8 +86,8 @@ _worker_test_history_lock = threading.Lock() _test_history_window = 120 # seconds to keep test history for rate calculation # Fair distribution settings -_min_batch_size = 100 # minimum proxies per batch -_max_batch_size = 1000 # maximum proxies per batch +_min_batch_size = 1 # minimum proxies per batch +_max_batch_size = 10000 # maximum proxies per batch _worker_timeout = 120 # seconds before worker considered inactive # Session tracking @@ -170,19 +170,22 @@ def get_due_proxy_count(db): def calculate_fair_batch_size(db, worker_id): - """Calculate fair batch size based on active workers and queue size.""" + """Calculate fair batch size based on active workers and queue size. + + Divides due work evenly among active workers. No artificial floor — + if only 6 proxies are due with 3 workers, each gets 2. + """ active_workers = max(1, get_active_worker_count()) due_count = get_due_proxy_count(db) if due_count == 0: - return _min_batch_size + return 0 - # Fair share: divide due work among active workers - # Add 20% buffer for speed variations between workers - fair_share = int((due_count / active_workers) * 1.2) + # Fair share: divide due work evenly among active workers + fair_share = max(1, int(due_count / active_workers)) - # Clamp to bounds - batch_size = max(_min_batch_size, min(fair_share, _max_batch_size)) + # Clamp to upper bound only + batch_size = min(fair_share, _max_batch_size) _log('fair_batch: due=%d workers=%d share=%d batch=%d' % ( due_count, active_workers, fair_share, batch_size), 'debug') @@ -381,7 +384,7 @@ def claim_work(db, worker_id, count=100): priority_params = [now_int, _working_checktime, now_int, _fail_retry_interval] query = ''' - SELECT ip, port, proto, failed, + SELECT ip, port, proto, failed, source_proto, CASE WHEN tested IS NULL THEN 0 WHEN (%s) > 3600 THEN 1 @@ -413,6 +416,7 @@ def claim_work(db, worker_id, count=100): 'port': row[1], 'proto': row[2], 'failed': row[3], + 'source_proto': row[4], }) if claimed: @@ -1078,7 +1082,7 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler): asn = params.get('asn', '') fmt = params.get('format', 'json') - sql = 'SELECT ip, port, proto, country, asn, avg_latency FROM proxylist WHERE failed=0 AND proto IS NOT NULL' + sql = 'SELECT ip, port, proto, country, asn, avg_latency, protos_working FROM proxylist WHERE failed=0 AND proto IS NOT NULL' args = [] if proto: @@ -1103,7 +1107,8 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler): else: proxies = [{ 'ip': r[0], 'port': r[1], 'proto': r[2], - 'country': r[3], 'asn': r[4], 'latency': r[5] + 'country': r[3], 'asn': r[4], 'latency': r[5], + 'protos': r[6].split(',') if r[6] else [r[2]] } for r in rows] self.send_json({'count': len(proxies), 'proxies': proxies}) except Exception as e: @@ -1122,7 +1127,7 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler): asn = params.get('asn', '') fmt = params.get('format', 'json') - sql = 'SELECT ip, port, proto, country, asn, avg_latency FROM proxylist WHERE failed=0 AND proto IS NOT NULL' + sql = 'SELECT ip, port, proto, country, asn, avg_latency, protos_working FROM proxylist WHERE failed=0 AND proto IS NOT NULL' args = [] if proto: @@ -1146,7 +1151,8 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler): else: proxies = [{ 'ip': r[0], 'port': r[1], 'proto': r[2], - 'country': r[3], 'asn': r[4], 'latency': r[5] + 'country': r[3], 'asn': r[4], 'latency': r[5], + 'protos': r[6].split(',') if r[6] else [r[2]] } for r in rows] self.send_json({'count': len(proxies), 'proxies': proxies}) except Exception as e: @@ -1171,11 +1177,12 @@ class ProxyAPIServer(threading.Thread): otherwise falls back to standard BaseHTTPServer. """ - def __init__(self, host, port, database, stats_provider=None, profiling=False): + def __init__(self, host, port, database, stats_provider=None, profiling=False, url_database=None): threading.Thread.__init__(self) self.host = host self.port = port self.database = database + self.url_database = url_database self.stats_provider = stats_provider self.profiling = profiling self.daemon = True @@ -1444,7 +1451,7 @@ class ProxyAPIServer(threading.Thread): asn = query_params.get('asn', '') fmt = query_params.get('format', 'json') - sql = 'SELECT ip, port, proto, country, asn, avg_latency FROM proxylist WHERE failed=0 AND proto IS NOT NULL' + sql = 'SELECT ip, port, proto, country, asn, avg_latency, protos_working FROM proxylist WHERE failed=0 AND proto IS NOT NULL' args = [] if proto: sql += ' AND proto=?' @@ -1463,7 +1470,11 @@ class ProxyAPIServer(threading.Thread): if fmt == 'plain': return '\n'.join('%s:%s' % (r[0], r[1]) for r in rows), 'text/plain', 200 - proxies = [{'proxy': '%s:%s' % (r[0], r[1]), 'proto': r[2], 'country': r[3], 'asn': r[4], 'latency': r[5]} for r in rows] + proxies = [{ + 'proxy': '%s:%s' % (r[0], r[1]), 'proto': r[2], 'country': r[3], + 'asn': r[4], 'latency': r[5], + 'protos': r[6].split(',') if r[6] else [r[2]] + } for r in rows] return json.dumps({'count': len(proxies), 'proxies': proxies}, indent=2), 'application/json', 200 except Exception as e: return json.dumps({'error': str(e)}), 'application/json', 500 @@ -1474,7 +1485,7 @@ class ProxyAPIServer(threading.Thread): asn = query_params.get('asn', '') fmt = query_params.get('format', 'json') - sql = 'SELECT ip, port, proto, country, asn, avg_latency FROM proxylist WHERE failed=0 AND proto IS NOT NULL' + sql = 'SELECT ip, port, proto, country, asn, avg_latency, protos_working FROM proxylist WHERE failed=0 AND proto IS NOT NULL' args = [] if proto: sql += ' AND proto=?' @@ -1492,7 +1503,11 @@ class ProxyAPIServer(threading.Thread): if fmt == 'plain': return '\n'.join('%s://%s:%s' % (r[2] or 'http', r[0], r[1]) for r in rows), 'text/plain', 200 - proxies = [{'proxy': '%s:%s' % (r[0], r[1]), 'proto': r[2], 'country': r[3], 'asn': r[4], 'latency': r[5]} for r in rows] + proxies = [{ + 'proxy': '%s:%s' % (r[0], r[1]), 'proto': r[2], 'country': r[3], + 'asn': r[4], 'latency': r[5], + 'protos': r[6].split(',') if r[6] else [r[2]] + } for r in rows] return json.dumps({'count': len(proxies), 'proxies': proxies}, indent=2), 'application/json', 200 except Exception as e: return json.dumps({'error': str(e)}), 'application/json', 500 diff --git a/ppf.py b/ppf.py index 748a37b..07d2b6b 100644 --- a/ppf.py +++ b/ppf.py @@ -5,10 +5,10 @@ __version__ = '2.0.0' import sys import os -# Worker mode requires gevent - must monkey-patch before other imports -if '--worker' in sys.argv or '--register' in sys.argv: - from gevent import monkey - monkey.patch_all() +# Gevent monkey-patch MUST happen before any other imports +# Both master (httpd) and worker modes use gevent for async I/O +from gevent import monkey +monkey.patch_all() import cProfile import pstats @@ -598,6 +598,7 @@ def worker_main(config): port = proxy_info['port'] proto = proxy_info.get('proto', 'http') failed = proxy_info.get('failed', 0) + source_proto = proxy_info.get('source_proto') proxy_str = '%s:%d' % (ip, port) # Create state for this proxy @@ -607,7 +608,7 @@ def worker_main(config): country=None, mitm=0, consecutive_success=0, asn=None, oldies=False, completion_queue=completion_queue, - proxy_full=proxy_str + proxy_full=proxy_str, source_proto=source_proto ) pending_states[proxy_str] = state @@ -780,6 +781,7 @@ def main(): config.watchd.database, stats_provider=httpd_stats_provider, profiling=profiling, + url_database=config.ppf.database, ) httpd_server.start() @@ -838,9 +840,6 @@ def main(): else: _log('handing %d job(s) to %d thread(s)' % ( len(rows), config.ppf.threads ), 'ppf') - _proxylist = [ '%s://%s' % (p[0], p[1]) for p in proxydb.execute("SELECT proto,proxy from proxylist where failed=0 AND tested IS NOT NULL AND proto IN ('http','socks4','socks5')").fetchall() ] - if not _proxylist: _proxylist = None - for thread in threads: if thread.status == 'ok': url, proxylist, stale_count, error, retrievals, content_type, proxies_added, execute = thread.retrieve() @@ -857,6 +856,9 @@ def main(): threads = [ thread for thread in threads if thread.is_alive() ] if len(threads) < config.ppf.threads and rows: + # Only query proxydb when actually starting a new thread (reduces GIL blocking) + _proxylist = [ '%s://%s' % (p[0], p[1]) for p in proxydb.execute("SELECT proto,proxy from proxylist where failed=0 AND tested IS NOT NULL AND proto IN ('http','socks4','socks5')").fetchall() ] + if not _proxylist: _proxylist = None p = random.sample(_proxylist, min(5, len(_proxylist))) if _proxylist else None row = random.choice(rows) urldb.execute('UPDATE uris SET check_time=? where url=?', (time.time(), row[0])) diff --git a/proxywatchd.py b/proxywatchd.py index 0755887..ad886c5 100644 --- a/proxywatchd.py +++ b/proxywatchd.py @@ -285,12 +285,12 @@ class ProxyTestState(object): 'asn', 'isoldies', 'completion_queue', 'lock', 'results', 'completed', 'evaluated', 'last_latency_ms', 'exit_ip', 'reveals_headers', 'last_fail_category', 'original_failcount', 'had_ssl_test', 'ssl_success', - 'cert_error' + 'cert_error', 'source_proto', 'protos_working' ) def __init__(self, ip, port, proto, failcount, success_count, total_duration, country, mitm, consecutive_success, asn=None, oldies=False, - completion_queue=None, proxy_full=None): + completion_queue=None, proxy_full=None, source_proto=None): self.ip = ip self.port = int(port) self.proxy = '%s:%s' % (ip, port) @@ -326,6 +326,9 @@ class ProxyTestState(object): self.had_ssl_test = False self.ssl_success = False self.cert_error = False + # Protocol fingerprinting + self.source_proto = source_proto + self.protos_working = None def record_result(self, success, proto=None, duration=0, srv=None, tor=None, ssl=None, category=None, exit_ip=None, reveals_headers=None): """Record a single target test result. Thread-safe. @@ -432,7 +435,22 @@ class ProxyTestState(object): if config.watchd.debug: _log('ASN lookup failed for %s: %s' % (self.ip, e), 'debug') - self.proto = last_good['proto'] + # Collect all distinct working protocols + working_protos = set() + for s in successes: + if s.get('proto'): + working_protos.add(s['proto']) + if working_protos: + self.protos_working = ','.join(sorted(working_protos)) + # Pick most specific protocol: socks5 > socks4 > http + for best in ('socks5', 'socks4', 'http'): + if best in working_protos: + self.proto = best + break + else: + self.proto = last_good['proto'] + else: + self.proto = last_good['proto'] self.failcount = 0 # Only reset mitm after 3 consecutive clean successes (not on first success) # and only if this test didn't detect MITM @@ -598,6 +616,45 @@ class TargetTestJob(object): finally: sock.disconnect() + def _build_proto_order(self): + """Build smart protocol test order based on available intelligence. + + Priority: + 1. Previously successful proto (if set) + 2. Source-detected proto (if different, confidence >= 60) + 3. Remaining protos in default order: socks5, socks4, http + + For failing proxies (failcount > 0 and proto known), only retest + with the known proto to save resources. + """ + ps = self.proxy_state + default_order = ['socks5', 'socks4', 'http'] + + # Known proto from previous test: only retest that + if ps.proto is not None: + # For failing proxies, skip multi-proto discovery + if ps.failcount > 0: + return [ps.proto] + # For working proxies, lead with known proto but try others + protos = [ps.proto] + # Add source hint if different + if ps.source_proto and ps.source_proto != ps.proto: + protos.append(ps.source_proto) + # Fill remaining + for p in default_order: + if p not in protos: + protos.append(p) + return protos + + # Unknown proto: use source hint if available + protos = [] + if ps.source_proto: + protos.append(ps.source_proto) + for p in default_order: + if p not in protos: + protos.append(p) + return protos + def _connect_and_test(self): """Connect to target through the proxy and send test packet. @@ -615,7 +672,7 @@ class TargetTestJob(object): _log('FIRST TEST: proxy=%s target=%s check=%s ssl_first=%s' % ( ps.proxy, self.target_srv, self.checktype, config.watchd.ssl_first), 'info') - protos = ['http', 'socks5', 'socks4'] if ps.proto is None else [ps.proto] + protos = self._build_proto_order() pool = connection_pool.get_pool() # Phase 1: SSL handshake (if ssl_first enabled) @@ -882,7 +939,15 @@ class WorkerThread(): nao = time.time() # Assign worker ID for connection pool affinity job.worker_id = self.id - job.run() + try: + job.run() + except Exception as e: + # Ensure state completes on unexpected exceptions (prevents memory leak) + _log('job exception: %s' % e, 'error') + try: + job.proxy_state.record_result(False, category='exception') + except Exception: + pass # State may already be completed spent = time.time() - nao job_count += 1 duration_total += spent @@ -1251,7 +1316,7 @@ class Proxywatchd(): # Build due condition using new schedule formula due_sql, due_params = _build_due_sql() q = '''SELECT ip,port,proto,failed,success_count,total_duration,country,mitm, - consecutive_success,asn,proxy FROM proxylist WHERE %s ORDER BY RANDOM()''' % due_sql + consecutive_success,asn,proxy,source_proto FROM proxylist WHERE %s ORDER BY RANDOM()''' % due_sql _dbg('fetch_rows: working=%d fail_interval=%d backoff=%s max_fail=%d' % ( config.watchd.working_checktime, config.watchd.fail_retry_interval, config.watchd.fail_retry_backoff, config.watchd.max_fail)) @@ -1271,7 +1336,7 @@ class Proxywatchd(): now = time.time() oldies_max = config.watchd.max_fail + round(config.watchd.max_fail / 2) q_oldies = '''SELECT ip,port,proto,failed,success_count,total_duration,country, - mitm,consecutive_success,asn,proxy FROM proxylist + mitm,consecutive_success,asn,proxy,source_proto FROM proxylist WHERE failed >= ? AND failed < ? AND (tested + ?) < ? ORDER BY RANDOM()''' rows = db.execute(q_oldies, (config.watchd.max_fail, oldies_max, @@ -1314,12 +1379,12 @@ class Proxywatchd(): for row in rows: # create shared state for this proxy # row: ip, port, proto, failed, success_count, total_duration, - # country, mitm, consecutive_success, asn, proxy + # country, mitm, consecutive_success, asn, proxy, source_proto state = ProxyTestState( row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], asn=row[9], oldies=self.isoldies, completion_queue=self.completion_queue, - proxy_full=row[10] + proxy_full=row[10], source_proto=row[11] ) new_states.append(state) @@ -1424,7 +1489,7 @@ class Proxywatchd(): dead_count += 1 args.append((effective_failcount, job.checktime, 1, job.country, job.proto, job.success_count, job.total_duration, job.mitm, - job.consecutive_success, job.asn, job.proxy)) + job.consecutive_success, job.asn, job.protos_working, job.proxy)) success_rate = (float(sc) / len(self.collected)) * 100 ret = True @@ -1438,7 +1503,7 @@ class Proxywatchd(): if job.failcount == 0: args.append((job.failcount, job.checktime, 1, job.country, job.proto, job.success_count, job.total_duration, job.mitm, - job.consecutive_success, job.asn, job.proxy)) + job.consecutive_success, job.asn, job.protos_working, job.proxy)) if job.last_latency_ms is not None: latency_updates.append((job.proxy, job.last_latency_ms)) ret = False @@ -1455,7 +1520,7 @@ class Proxywatchd(): if job.failcount == 0 and job.exit_ip] with self._db_context() as db: - query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=?,asn=? WHERE proxy=?' + query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=?,asn=?,protos_working=? WHERE proxy=?' db.executemany(query, args) # Batch update latency metrics for successful proxies @@ -1699,7 +1764,8 @@ class Proxywatchd(): config.httpd.listenip, config.httpd.port, config.watchd.database, - stats_provider=self.get_runtime_stats + stats_provider=self.get_runtime_stats, + url_database=config.ppf.database, ) self.httpd_server.start() @@ -1734,27 +1800,32 @@ class Proxywatchd(): sleeptime -= 1 continue - # check if job queue is empty (work-stealing: threads pull as needed) - if self.job_queue.empty(): + # Skip job processing when threads=0 (master-only mode) + if config.watchd.threads > 0: + # check if job queue is empty (work-stealing: threads pull as needed) + if self.job_queue.empty(): + self.collect_work() + if not self.submit_collected() and self.tor_safeguard: + _log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd") + sleeptime = 60 + else: + job_count = self.prepare_jobs() + if job_count == 0: + # no jobs available, wait before checking again + sleeptime = 10 + + if not self.in_background: # single_thread scenario + self.threads[0].workloop() + self.collect_work() - if not self.submit_collected() and self.tor_safeguard: - _log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd") - sleeptime = 60 - else: - job_count = self.prepare_jobs() - if job_count == 0: - # no jobs available, wait before checking again - sleeptime = 10 - if not self.in_background: # single_thread scenario - self.threads[0].workloop() - - self.collect_work() - - if len(self.collected) > self.submit_after: - if not self.submit_collected() and self.tor_safeguard: - _log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd") - sleeptime = 60 + if len(self.collected) > self.submit_after: + if not self.submit_collected() and self.tor_safeguard: + _log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd") + sleeptime = 60 + else: + # Master-only mode: sleep to avoid busy loop + sleeptime = 10 # Update rate history for sparklines self.stats.update_history()