ppf: add worker_v2_main() for URL-driven discovery

This commit is contained in:
Username
2026-02-17 14:23:58 +01:00
parent 0685c2bc4c
commit 862eeed5c8

412
ppf.py
View File

@@ -772,6 +772,411 @@ def worker_main(config):
_log(' proxies tested: %d' % proxies_tested, 'info')
def worker_v2_main(config):
"""V2 worker mode -- URL-driven discovery.
Claims URLs from master, fetches through Tor, extracts and tests proxies,
reports working proxies back to master.
"""
import json
global urllib2
try:
import Queue
except ImportError:
import queue as Queue
import proxywatchd
proxywatchd.set_config(config)
server_url = config.args.server
if not server_url:
_log('--server URL required for worker mode', 'error')
sys.exit(1)
worker_key = config.args.worker_key
worker_name = config.args.worker_name or os.uname()[1]
num_threads = config.watchd.threads
url_batch_size = config.worker.url_batch_size
# Register if --register flag or no key provided
if config.args.register or not worker_key:
_log('registering with master: %s' % server_url, 'info')
worker_id, worker_key = worker_register(server_url, worker_name)
if not worker_key:
_log('registration failed, exiting', 'error')
sys.exit(1)
_log('registered as %s (id: %s)' % (worker_name, worker_id), 'info')
_log('worker key: %s' % worker_key, 'info')
_log('save this key with --worker-key for future runs', 'info')
if config.args.register:
return
_log('starting worker V2 mode (URL-driven)', 'info')
_log(' server: %s' % server_url, 'info')
_log(' threads: %d' % num_threads, 'info')
_log(' url batch: %d' % url_batch_size, 'info')
_log(' tor hosts: %s' % config.common.tor_hosts, 'info')
# Verify Tor connectivity before starting
import socks
working_tor_hosts = []
for tor_host in config.torhosts:
host, port = tor_host.split(':')
port = int(port)
try:
test_sock = socks.socksocket()
test_sock.set_proxy(socks.SOCKS5, host, port)
test_sock.settimeout(10)
test_sock.connect(('check.torproject.org', 80))
test_sock.send(b'GET / HTTP/1.0\r\nHost: check.torproject.org\r\n\r\n')
resp = test_sock.recv(512)
test_sock.close()
if resp and (b'HTTP/' in resp or len(resp) > 0):
status = resp.split(b'\r\n')[0] if b'\r\n' in resp else resp[:50]
_log('tor host %s:%d OK (%s)' % (host, port, status), 'info')
working_tor_hosts.append(tor_host)
else:
_log('tor host %s:%d no response' % (host, port), 'warn')
except Exception as e:
_log('tor host %s:%d failed: %s' % (host, port, e), 'warn')
if not working_tor_hosts:
_log('no working Tor hosts, cannot start worker', 'error')
sys.exit(1)
_log('%d/%d Tor hosts verified' % (len(working_tor_hosts), len(config.torhosts)), 'info')
# Create shared queues for worker threads
job_queue = proxywatchd.PriorityJobQueue()
completion_queue = Queue.Queue()
# Spawn worker threads
threads = []
for i in range(num_threads):
wt = proxywatchd.WorkerThread('w%d' % i, job_queue)
wt.start_thread()
threads.append(wt)
time.sleep(random.random() / 10)
_log('spawned %d worker threads' % len(threads), 'info')
# Session for fetching URLs through Tor
session = fetch.FetchSession()
cycles = 0
urls_fetched = 0
proxies_found = 0
proxies_working = 0
start_time = time.time()
current_tor_ip = None
consecutive_tor_failures = 0
worker_profiling = config.args.profile or config.common.profiling
wstate = {'worker_key': worker_key, 'worker_id': worker_id, 'backoff': 10}
def do_register():
"""Register with master, with exponential backoff on failure."""
while True:
_log('registering with master: %s' % server_url, 'info')
new_id, new_key = worker_register(server_url, worker_name)
if new_key:
wstate['worker_id'] = new_id
wstate['worker_key'] = new_key
wstate['backoff'] = 10
_log('registered as %s (id: %s)' % (worker_name, new_id), 'info')
return True
else:
_log('registration failed, retrying in %ds' % wstate['backoff'], 'warn')
time.sleep(wstate['backoff'])
wstate['backoff'] = min(wstate['backoff'] * 2, 300)
def wait_for_tor():
"""Wait for Tor to become available, checking every 30 seconds."""
check_interval = 30
while True:
working, tor_ip = check_tor_connectivity(config.torhosts)
if working:
_log('tor recovered: %s (%s)' % (working[0], tor_ip or 'unknown'), 'info')
try:
worker_send_heartbeat(server_url, wstate['worker_key'], True, tor_ip, worker_profiling, num_threads)
except NeedReregister:
do_register()
return working, tor_ip
_log('tor still down, retrying in %ds' % check_interval, 'warn')
try:
worker_send_heartbeat(server_url, wstate['worker_key'], False, None, worker_profiling, num_threads)
except NeedReregister:
do_register()
time.sleep(check_interval)
try:
while True:
# Tor connectivity check
working, tor_ip = check_tor_connectivity(config.torhosts)
if not working:
consecutive_tor_failures += 1
_log('tor down before claiming URLs (consecutive: %d)' % consecutive_tor_failures, 'warn')
try:
worker_send_heartbeat(server_url, wstate['worker_key'], False, None, worker_profiling, num_threads)
except NeedReregister:
do_register()
if consecutive_tor_failures >= 2:
_log('tor appears down, waiting before claiming URLs', 'error')
working, current_tor_ip = wait_for_tor()
consecutive_tor_failures = 0
else:
time.sleep(10)
continue
else:
consecutive_tor_failures = 0
if tor_ip != current_tor_ip:
if current_tor_ip:
_log('tor circuit rotated: %s' % tor_ip, 'info')
current_tor_ip = tor_ip
try:
worker_send_heartbeat(server_url, wstate['worker_key'], True, tor_ip, worker_profiling, num_threads)
except NeedReregister:
do_register()
# Claim URLs from master
try:
url_infos = worker_claim_urls(server_url, wstate['worker_key'], url_batch_size)
except NeedReregister:
do_register()
continue
if not url_infos:
_log('no URLs available, sleeping 30s', 'info')
time.sleep(30)
continue
_log('claimed %d URLs to process' % len(url_infos), 'info')
# Phase 1: Fetch URLs and extract proxies
url_reports = []
all_extracted = [] # list of (addr, proto, confidence, source_url)
for url_info in url_infos:
url = url_info.get('url', '')
last_hash = url_info.get('last_hash')
proto_hint = url_info.get('proto_hint')
fetch_start = time.time()
try:
content = session.fetch(url)
except Exception as e:
_log('%s: fetch error: %s' % (url.split('/')[2] if '/' in url else url, e), 'error')
content = None
fetch_time_ms = int((time.time() - fetch_start) * 1000)
urls_fetched += 1
if not content:
url_reports.append({
'url': url,
'success': False,
'content_hash': None,
'proxy_count': 0,
'fetch_time_ms': fetch_time_ms,
'changed': False,
'error': 'fetch failed',
})
continue
# Detect protocol from URL path
proto = fetch.detect_proto_from_path(url) or proto_hint
# Extract proxies (no filter_known -- workers have no proxydb)
extracted = fetch.extract_proxies(content, filter_known=False, proto=proto)
# Compute hash of extracted proxy list
content_hash = dbs.compute_proxy_list_hash(extracted)
if content_hash and last_hash and content_hash == last_hash:
# Content unchanged
url_reports.append({
'url': url,
'success': True,
'content_hash': content_hash,
'proxy_count': len(extracted),
'fetch_time_ms': fetch_time_ms,
'changed': False,
'error': None,
})
host = url.split('/')[2] if '/' in url else url
_log('%s: unchanged (%d proxies, hash match)' % (host, len(extracted)), 'stale')
continue
# Content changed or first fetch
for addr, pr, conf in extracted:
all_extracted.append((addr, pr, conf, url))
url_reports.append({
'url': url,
'success': True,
'content_hash': content_hash,
'proxy_count': len(extracted),
'fetch_time_ms': fetch_time_ms,
'changed': True,
'error': None,
})
host = url.split('/')[2] if '/' in url else url
_log('%s: %d proxies extracted' % (host, len(extracted)), 'info')
# Report URL health to master
if url_reports:
try:
worker_report_urls(server_url, wstate['worker_key'], url_reports)
except NeedReregister:
do_register()
try:
worker_report_urls(server_url, wstate['worker_key'], url_reports)
except NeedReregister:
_log('still rejected after re-register, discarding url reports', 'error')
# Deduplicate extracted proxies by address
seen = set()
unique_proxies = []
source_map = {} # addr -> first source_url
for addr, pr, conf, source_url in all_extracted:
if addr not in seen:
seen.add(addr)
unique_proxies.append((addr, pr, conf))
source_map[addr] = source_url
proxies_found += len(unique_proxies)
if not unique_proxies:
cycles += 1
time.sleep(1)
continue
_log('testing %d unique proxies' % len(unique_proxies), 'info')
# Phase 2: Test extracted proxies using worker thread pool
pending_states = {}
all_jobs = []
checktypes = config.watchd.checktypes
for addr, pr, conf in unique_proxies:
# Parse ip:port from addr (may contain auth: user:pass@ip:port)
addr_part = addr.split('@')[-1] if '@' in addr else addr
# Handle IPv6 [ipv6]:port
if addr_part.startswith('['):
bracket_end = addr_part.index(']')
ip = addr_part[1:bracket_end]
port = int(addr_part[bracket_end+2:])
else:
ip, port_str = addr_part.rsplit(':', 1)
port = int(port_str)
proto = pr or 'http'
proxy_str = '%s:%d' % (ip, port)
state = proxywatchd.ProxyTestState(
ip, port, proto, 0,
success_count=0, total_duration=0.0,
country=None, mitm=0, consecutive_success=0,
asn=None, oldies=False,
completion_queue=completion_queue,
proxy_full=addr, source_proto=pr
)
pending_states[proxy_str] = state
checktype = random.choice(checktypes)
if checktype == 'judges':
available = proxywatchd.judge_stats.get_available_judges(
list(proxywatchd.judges.keys()))
target = random.choice(available) if available else random.choice(
list(proxywatchd.judges.keys()))
elif checktype == 'ssl':
target = random.choice(proxywatchd.ssl_targets)
elif checktype == 'irc':
target = random.choice(config.servers) if config.servers else 'irc.libera.chat:6667'
else: # head
target = random.choice(list(proxywatchd.regexes.keys()))
job = proxywatchd.TargetTestJob(state, target, checktype)
all_jobs.append(job)
random.shuffle(all_jobs)
for job in all_jobs:
job_queue.put(job, priority=0)
# Wait for completion
completed = 0
timeout_start = time.time()
timeout_seconds = max(config.watchd.timeout * 2 + 30, len(all_jobs) * 0.5)
working_results = []
while completed < len(all_jobs):
try:
state = completion_queue.get(timeout=1)
completed += 1
if state.failcount == 0:
latency_sec = (state.last_latency_ms / 1000.0) if state.last_latency_ms else 0
proxy_addr = state.proxy
if state.auth:
proxy_addr = '%s@%s' % (state.auth, state.proxy)
working_results.append({
'ip': state.ip,
'port': state.port,
'proto': state.proto,
'source_proto': state.source_proto,
'latency': round(latency_sec, 3),
'exit_ip': state.exit_ip,
'source_url': source_map.get(proxy_addr) or source_map.get(state.proxy, ''),
})
if completed % 50 == 0 or completed == len(all_jobs):
_log('tested %d/%d proxies (%d working)' % (
completed, len(all_jobs), len(working_results)), 'info')
except Queue.Empty:
if time.time() - timeout_start > timeout_seconds:
_log('test timeout, %d/%d completed' % (completed, len(all_jobs)), 'warn')
break
continue
proxies_working += len(working_results)
# Report working proxies to master
if working_results:
try:
processed = worker_report_proxies(server_url, wstate['worker_key'], working_results)
except NeedReregister:
do_register()
try:
processed = worker_report_proxies(server_url, wstate['worker_key'], working_results)
except NeedReregister:
_log('still rejected after re-register, discarding proxy reports', 'error')
processed = 0
_log('reported %d working proxies (submitted %d)' % (len(working_results), processed), 'info')
cycles += 1
time.sleep(1)
except KeyboardInterrupt:
elapsed = time.time() - start_time
_log('worker V2 stopping...', 'info')
session.close()
for wt in threads:
wt.stop()
for wt in threads:
wt.term()
_log('worker V2 stopped after %s' % format_duration(int(elapsed)), 'info')
_log(' cycles: %d' % cycles, 'info')
_log(' urls fetched: %d' % urls_fetched, 'info')
_log(' proxies found: %d' % proxies_found, 'info')
_log(' proxies working: %d' % proxies_working, 'info')
def main():
"""Main entry point."""
global config
@@ -784,7 +1189,12 @@ def main():
else:
sys.exit(1)
# Worker mode: connect to master server instead of running locally
# V2 worker mode: URL-driven discovery
if config.args.worker_v2:
worker_v2_main(config)
return
# V1 worker mode: connect to master server instead of running locally
if config.args.worker or config.args.register:
worker_main(config)
return