httpd: pass url database to api server

This commit is contained in:
Username
2026-02-17 13:42:01 +01:00
parent da832d94b7
commit 5197c3b7e6
3 changed files with 147 additions and 59 deletions

View File

@@ -86,8 +86,8 @@ _worker_test_history_lock = threading.Lock()
_test_history_window = 120 # seconds to keep test history for rate calculation
# Fair distribution settings
_min_batch_size = 100 # minimum proxies per batch
_max_batch_size = 1000 # maximum proxies per batch
_min_batch_size = 1 # minimum proxies per batch
_max_batch_size = 10000 # maximum proxies per batch
_worker_timeout = 120 # seconds before worker considered inactive
# Session tracking
@@ -170,19 +170,22 @@ def get_due_proxy_count(db):
def calculate_fair_batch_size(db, worker_id):
"""Calculate fair batch size based on active workers and queue size."""
"""Calculate fair batch size based on active workers and queue size.
Divides due work evenly among active workers. No artificial floor —
if only 6 proxies are due with 3 workers, each gets 2.
"""
active_workers = max(1, get_active_worker_count())
due_count = get_due_proxy_count(db)
if due_count == 0:
return _min_batch_size
return 0
# Fair share: divide due work among active workers
# Add 20% buffer for speed variations between workers
fair_share = int((due_count / active_workers) * 1.2)
# Fair share: divide due work evenly among active workers
fair_share = max(1, int(due_count / active_workers))
# Clamp to bounds
batch_size = max(_min_batch_size, min(fair_share, _max_batch_size))
# Clamp to upper bound only
batch_size = min(fair_share, _max_batch_size)
_log('fair_batch: due=%d workers=%d share=%d batch=%d' % (
due_count, active_workers, fair_share, batch_size), 'debug')
@@ -381,7 +384,7 @@ def claim_work(db, worker_id, count=100):
priority_params = [now_int, _working_checktime, now_int, _fail_retry_interval]
query = '''
SELECT ip, port, proto, failed,
SELECT ip, port, proto, failed, source_proto,
CASE
WHEN tested IS NULL THEN 0
WHEN (%s) > 3600 THEN 1
@@ -413,6 +416,7 @@ def claim_work(db, worker_id, count=100):
'port': row[1],
'proto': row[2],
'failed': row[3],
'source_proto': row[4],
})
if claimed:
@@ -1078,7 +1082,7 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler):
asn = params.get('asn', '')
fmt = params.get('format', 'json')
sql = 'SELECT ip, port, proto, country, asn, avg_latency FROM proxylist WHERE failed=0 AND proto IS NOT NULL'
sql = 'SELECT ip, port, proto, country, asn, avg_latency, protos_working FROM proxylist WHERE failed=0 AND proto IS NOT NULL'
args = []
if proto:
@@ -1103,7 +1107,8 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler):
else:
proxies = [{
'ip': r[0], 'port': r[1], 'proto': r[2],
'country': r[3], 'asn': r[4], 'latency': r[5]
'country': r[3], 'asn': r[4], 'latency': r[5],
'protos': r[6].split(',') if r[6] else [r[2]]
} for r in rows]
self.send_json({'count': len(proxies), 'proxies': proxies})
except Exception as e:
@@ -1122,7 +1127,7 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler):
asn = params.get('asn', '')
fmt = params.get('format', 'json')
sql = 'SELECT ip, port, proto, country, asn, avg_latency FROM proxylist WHERE failed=0 AND proto IS NOT NULL'
sql = 'SELECT ip, port, proto, country, asn, avg_latency, protos_working FROM proxylist WHERE failed=0 AND proto IS NOT NULL'
args = []
if proto:
@@ -1146,7 +1151,8 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler):
else:
proxies = [{
'ip': r[0], 'port': r[1], 'proto': r[2],
'country': r[3], 'asn': r[4], 'latency': r[5]
'country': r[3], 'asn': r[4], 'latency': r[5],
'protos': r[6].split(',') if r[6] else [r[2]]
} for r in rows]
self.send_json({'count': len(proxies), 'proxies': proxies})
except Exception as e:
@@ -1171,11 +1177,12 @@ class ProxyAPIServer(threading.Thread):
otherwise falls back to standard BaseHTTPServer.
"""
def __init__(self, host, port, database, stats_provider=None, profiling=False):
def __init__(self, host, port, database, stats_provider=None, profiling=False, url_database=None):
threading.Thread.__init__(self)
self.host = host
self.port = port
self.database = database
self.url_database = url_database
self.stats_provider = stats_provider
self.profiling = profiling
self.daemon = True
@@ -1444,7 +1451,7 @@ class ProxyAPIServer(threading.Thread):
asn = query_params.get('asn', '')
fmt = query_params.get('format', 'json')
sql = 'SELECT ip, port, proto, country, asn, avg_latency FROM proxylist WHERE failed=0 AND proto IS NOT NULL'
sql = 'SELECT ip, port, proto, country, asn, avg_latency, protos_working FROM proxylist WHERE failed=0 AND proto IS NOT NULL'
args = []
if proto:
sql += ' AND proto=?'
@@ -1463,7 +1470,11 @@ class ProxyAPIServer(threading.Thread):
if fmt == 'plain':
return '\n'.join('%s:%s' % (r[0], r[1]) for r in rows), 'text/plain', 200
proxies = [{'proxy': '%s:%s' % (r[0], r[1]), 'proto': r[2], 'country': r[3], 'asn': r[4], 'latency': r[5]} for r in rows]
proxies = [{
'proxy': '%s:%s' % (r[0], r[1]), 'proto': r[2], 'country': r[3],
'asn': r[4], 'latency': r[5],
'protos': r[6].split(',') if r[6] else [r[2]]
} for r in rows]
return json.dumps({'count': len(proxies), 'proxies': proxies}, indent=2), 'application/json', 200
except Exception as e:
return json.dumps({'error': str(e)}), 'application/json', 500
@@ -1474,7 +1485,7 @@ class ProxyAPIServer(threading.Thread):
asn = query_params.get('asn', '')
fmt = query_params.get('format', 'json')
sql = 'SELECT ip, port, proto, country, asn, avg_latency FROM proxylist WHERE failed=0 AND proto IS NOT NULL'
sql = 'SELECT ip, port, proto, country, asn, avg_latency, protos_working FROM proxylist WHERE failed=0 AND proto IS NOT NULL'
args = []
if proto:
sql += ' AND proto=?'
@@ -1492,7 +1503,11 @@ class ProxyAPIServer(threading.Thread):
if fmt == 'plain':
return '\n'.join('%s://%s:%s' % (r[2] or 'http', r[0], r[1]) for r in rows), 'text/plain', 200
proxies = [{'proxy': '%s:%s' % (r[0], r[1]), 'proto': r[2], 'country': r[3], 'asn': r[4], 'latency': r[5]} for r in rows]
proxies = [{
'proxy': '%s:%s' % (r[0], r[1]), 'proto': r[2], 'country': r[3],
'asn': r[4], 'latency': r[5],
'protos': r[6].split(',') if r[6] else [r[2]]
} for r in rows]
return json.dumps({'count': len(proxies), 'proxies': proxies}, indent=2), 'application/json', 200
except Exception as e:
return json.dumps({'error': str(e)}), 'application/json', 500

18
ppf.py
View File

@@ -5,10 +5,10 @@ __version__ = '2.0.0'
import sys
import os
# Worker mode requires gevent - must monkey-patch before other imports
if '--worker' in sys.argv or '--register' in sys.argv:
from gevent import monkey
monkey.patch_all()
# Gevent monkey-patch MUST happen before any other imports
# Both master (httpd) and worker modes use gevent for async I/O
from gevent import monkey
monkey.patch_all()
import cProfile
import pstats
@@ -598,6 +598,7 @@ def worker_main(config):
port = proxy_info['port']
proto = proxy_info.get('proto', 'http')
failed = proxy_info.get('failed', 0)
source_proto = proxy_info.get('source_proto')
proxy_str = '%s:%d' % (ip, port)
# Create state for this proxy
@@ -607,7 +608,7 @@ def worker_main(config):
country=None, mitm=0, consecutive_success=0,
asn=None, oldies=False,
completion_queue=completion_queue,
proxy_full=proxy_str
proxy_full=proxy_str, source_proto=source_proto
)
pending_states[proxy_str] = state
@@ -780,6 +781,7 @@ def main():
config.watchd.database,
stats_provider=httpd_stats_provider,
profiling=profiling,
url_database=config.ppf.database,
)
httpd_server.start()
@@ -838,9 +840,6 @@ def main():
else:
_log('handing %d job(s) to %d thread(s)' % ( len(rows), config.ppf.threads ), 'ppf')
_proxylist = [ '%s://%s' % (p[0], p[1]) for p in proxydb.execute("SELECT proto,proxy from proxylist where failed=0 AND tested IS NOT NULL AND proto IN ('http','socks4','socks5')").fetchall() ]
if not _proxylist: _proxylist = None
for thread in threads:
if thread.status == 'ok':
url, proxylist, stale_count, error, retrievals, content_type, proxies_added, execute = thread.retrieve()
@@ -857,6 +856,9 @@ def main():
threads = [ thread for thread in threads if thread.is_alive() ]
if len(threads) < config.ppf.threads and rows:
# Only query proxydb when actually starting a new thread (reduces GIL blocking)
_proxylist = [ '%s://%s' % (p[0], p[1]) for p in proxydb.execute("SELECT proto,proxy from proxylist where failed=0 AND tested IS NOT NULL AND proto IN ('http','socks4','socks5')").fetchall() ]
if not _proxylist: _proxylist = None
p = random.sample(_proxylist, min(5, len(_proxylist))) if _proxylist else None
row = random.choice(rows)
urldb.execute('UPDATE uris SET check_time=? where url=?', (time.time(), row[0]))

View File

@@ -285,12 +285,12 @@ class ProxyTestState(object):
'asn', 'isoldies', 'completion_queue', 'lock', 'results', 'completed',
'evaluated', 'last_latency_ms', 'exit_ip', 'reveals_headers',
'last_fail_category', 'original_failcount', 'had_ssl_test', 'ssl_success',
'cert_error'
'cert_error', 'source_proto', 'protos_working'
)
def __init__(self, ip, port, proto, failcount, success_count, total_duration,
country, mitm, consecutive_success, asn=None, oldies=False,
completion_queue=None, proxy_full=None):
completion_queue=None, proxy_full=None, source_proto=None):
self.ip = ip
self.port = int(port)
self.proxy = '%s:%s' % (ip, port)
@@ -326,6 +326,9 @@ class ProxyTestState(object):
self.had_ssl_test = False
self.ssl_success = False
self.cert_error = False
# Protocol fingerprinting
self.source_proto = source_proto
self.protos_working = None
def record_result(self, success, proto=None, duration=0, srv=None, tor=None, ssl=None, category=None, exit_ip=None, reveals_headers=None):
"""Record a single target test result. Thread-safe.
@@ -432,7 +435,22 @@ class ProxyTestState(object):
if config.watchd.debug:
_log('ASN lookup failed for %s: %s' % (self.ip, e), 'debug')
self.proto = last_good['proto']
# Collect all distinct working protocols
working_protos = set()
for s in successes:
if s.get('proto'):
working_protos.add(s['proto'])
if working_protos:
self.protos_working = ','.join(sorted(working_protos))
# Pick most specific protocol: socks5 > socks4 > http
for best in ('socks5', 'socks4', 'http'):
if best in working_protos:
self.proto = best
break
else:
self.proto = last_good['proto']
else:
self.proto = last_good['proto']
self.failcount = 0
# Only reset mitm after 3 consecutive clean successes (not on first success)
# and only if this test didn't detect MITM
@@ -598,6 +616,45 @@ class TargetTestJob(object):
finally:
sock.disconnect()
def _build_proto_order(self):
"""Build smart protocol test order based on available intelligence.
Priority:
1. Previously successful proto (if set)
2. Source-detected proto (if different, confidence >= 60)
3. Remaining protos in default order: socks5, socks4, http
For failing proxies (failcount > 0 and proto known), only retest
with the known proto to save resources.
"""
ps = self.proxy_state
default_order = ['socks5', 'socks4', 'http']
# Known proto from previous test: only retest that
if ps.proto is not None:
# For failing proxies, skip multi-proto discovery
if ps.failcount > 0:
return [ps.proto]
# For working proxies, lead with known proto but try others
protos = [ps.proto]
# Add source hint if different
if ps.source_proto and ps.source_proto != ps.proto:
protos.append(ps.source_proto)
# Fill remaining
for p in default_order:
if p not in protos:
protos.append(p)
return protos
# Unknown proto: use source hint if available
protos = []
if ps.source_proto:
protos.append(ps.source_proto)
for p in default_order:
if p not in protos:
protos.append(p)
return protos
def _connect_and_test(self):
"""Connect to target through the proxy and send test packet.
@@ -615,7 +672,7 @@ class TargetTestJob(object):
_log('FIRST TEST: proxy=%s target=%s check=%s ssl_first=%s' % (
ps.proxy, self.target_srv, self.checktype, config.watchd.ssl_first), 'info')
protos = ['http', 'socks5', 'socks4'] if ps.proto is None else [ps.proto]
protos = self._build_proto_order()
pool = connection_pool.get_pool()
# Phase 1: SSL handshake (if ssl_first enabled)
@@ -882,7 +939,15 @@ class WorkerThread():
nao = time.time()
# Assign worker ID for connection pool affinity
job.worker_id = self.id
job.run()
try:
job.run()
except Exception as e:
# Ensure state completes on unexpected exceptions (prevents memory leak)
_log('job exception: %s' % e, 'error')
try:
job.proxy_state.record_result(False, category='exception')
except Exception:
pass # State may already be completed
spent = time.time() - nao
job_count += 1
duration_total += spent
@@ -1251,7 +1316,7 @@ class Proxywatchd():
# Build due condition using new schedule formula
due_sql, due_params = _build_due_sql()
q = '''SELECT ip,port,proto,failed,success_count,total_duration,country,mitm,
consecutive_success,asn,proxy FROM proxylist WHERE %s ORDER BY RANDOM()''' % due_sql
consecutive_success,asn,proxy,source_proto FROM proxylist WHERE %s ORDER BY RANDOM()''' % due_sql
_dbg('fetch_rows: working=%d fail_interval=%d backoff=%s max_fail=%d' % (
config.watchd.working_checktime, config.watchd.fail_retry_interval,
config.watchd.fail_retry_backoff, config.watchd.max_fail))
@@ -1271,7 +1336,7 @@ class Proxywatchd():
now = time.time()
oldies_max = config.watchd.max_fail + round(config.watchd.max_fail / 2)
q_oldies = '''SELECT ip,port,proto,failed,success_count,total_duration,country,
mitm,consecutive_success,asn,proxy FROM proxylist
mitm,consecutive_success,asn,proxy,source_proto FROM proxylist
WHERE failed >= ? AND failed < ? AND (tested + ?) < ?
ORDER BY RANDOM()'''
rows = db.execute(q_oldies, (config.watchd.max_fail, oldies_max,
@@ -1314,12 +1379,12 @@ class Proxywatchd():
for row in rows:
# create shared state for this proxy
# row: ip, port, proto, failed, success_count, total_duration,
# country, mitm, consecutive_success, asn, proxy
# country, mitm, consecutive_success, asn, proxy, source_proto
state = ProxyTestState(
row[0], row[1], row[2], row[3], row[4], row[5],
row[6], row[7], row[8], asn=row[9],
oldies=self.isoldies, completion_queue=self.completion_queue,
proxy_full=row[10]
proxy_full=row[10], source_proto=row[11]
)
new_states.append(state)
@@ -1424,7 +1489,7 @@ class Proxywatchd():
dead_count += 1
args.append((effective_failcount, job.checktime, 1, job.country, job.proto,
job.success_count, job.total_duration, job.mitm,
job.consecutive_success, job.asn, job.proxy))
job.consecutive_success, job.asn, job.protos_working, job.proxy))
success_rate = (float(sc) / len(self.collected)) * 100
ret = True
@@ -1438,7 +1503,7 @@ class Proxywatchd():
if job.failcount == 0:
args.append((job.failcount, job.checktime, 1, job.country, job.proto,
job.success_count, job.total_duration, job.mitm,
job.consecutive_success, job.asn, job.proxy))
job.consecutive_success, job.asn, job.protos_working, job.proxy))
if job.last_latency_ms is not None:
latency_updates.append((job.proxy, job.last_latency_ms))
ret = False
@@ -1455,7 +1520,7 @@ class Proxywatchd():
if job.failcount == 0 and job.exit_ip]
with self._db_context() as db:
query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=?,asn=? WHERE proxy=?'
query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=?,asn=?,protos_working=? WHERE proxy=?'
db.executemany(query, args)
# Batch update latency metrics for successful proxies
@@ -1699,7 +1764,8 @@ class Proxywatchd():
config.httpd.listenip,
config.httpd.port,
config.watchd.database,
stats_provider=self.get_runtime_stats
stats_provider=self.get_runtime_stats,
url_database=config.ppf.database,
)
self.httpd_server.start()
@@ -1734,27 +1800,32 @@ class Proxywatchd():
sleeptime -= 1
continue
# check if job queue is empty (work-stealing: threads pull as needed)
if self.job_queue.empty():
# Skip job processing when threads=0 (master-only mode)
if config.watchd.threads > 0:
# check if job queue is empty (work-stealing: threads pull as needed)
if self.job_queue.empty():
self.collect_work()
if not self.submit_collected() and self.tor_safeguard:
_log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd")
sleeptime = 60
else:
job_count = self.prepare_jobs()
if job_count == 0:
# no jobs available, wait before checking again
sleeptime = 10
if not self.in_background: # single_thread scenario
self.threads[0].workloop()
self.collect_work()
if not self.submit_collected() and self.tor_safeguard:
_log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd")
sleeptime = 60
else:
job_count = self.prepare_jobs()
if job_count == 0:
# no jobs available, wait before checking again
sleeptime = 10
if not self.in_background: # single_thread scenario
self.threads[0].workloop()
self.collect_work()
if len(self.collected) > self.submit_after:
if not self.submit_collected() and self.tor_safeguard:
_log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd")
sleeptime = 60
if len(self.collected) > self.submit_after:
if not self.submit_collected() and self.tor_safeguard:
_log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd")
sleeptime = 60
else:
# Master-only mode: sleep to avoid busy loop
sleeptime = 10
# Update rate history for sparklines
self.stats.update_history()