Files
ppf/ppf.py
Username dfcd8f0c00 add test provenance columns and worker report fields
Add last_check/last_target columns to proxylist schema with migration.
Include checktype and target in V2 worker report payload.
2026-02-17 21:06:21 +01:00

1397 lines
53 KiB
Python

#!/usr/bin/env python2
__version__ = '2.0.0'
import sys
import os
# Gevent monkey-patch MUST happen before any other imports
# Both master (httpd) and worker modes use gevent for async I/O
from gevent import monkey
monkey.patch_all()
import cProfile
import pstats
import signal
import dbs
import time
import mysqlite
from misc import _log
from config import Config
import fetch
from soup_parser import set_nobs
import threading
import random
import json
# Global profiler for signal handler access
_profiler = None
# Handle SIGTERM gracefully (for container stop)
def sigterm_handler(signum, frame):
global _profiler
if _profiler:
_profiler.disable()
_profiler.dump_stats('data/profile.stats')
_log('profile stats written to data/profile.stats (SIGTERM)', 'info')
raise KeyboardInterrupt
signal.signal(signal.SIGTERM, sigterm_handler)
config = Config()
def reset_state():
"""Clear all proxy and session state for a fresh start.
Clears:
- proxylist table (all proxies)
- session_state table (test counters)
- stats_history table (historical stats)
- scraper_state.json (engine backoffs)
- mitm_certs.json (MITM cert tracking)
"""
db_path = config.watchd.database
data_dir = os.path.dirname(db_path) or 'data'
# Clear database tables
try:
db = mysqlite.mysqlite(db_path, str)
dbs.create_table_if_not_exists(db, 'proxylist')
db.execute('DELETE FROM proxylist')
db.commit()
_log('cleared proxylist table', 'reset')
# Clear session_state if exists
try:
db.execute('DELETE FROM session_state')
db.commit()
_log('cleared session_state table', 'reset')
except Exception:
pass
# Clear stats_history if exists
try:
db.execute('DELETE FROM stats_history')
db.commit()
_log('cleared stats_history table', 'reset')
except Exception:
pass
db.execute('VACUUM')
db.commit()
db.close()
except Exception as e:
_log('database reset failed: %s' % e, 'error')
return False
# Remove state files
state_files = [
os.path.join(data_dir, 'scraper_state.json'),
os.path.join(data_dir, 'mitm_certs.json'),
]
for f in state_files:
if os.path.exists(f):
try:
os.remove(f)
_log('removed %s' % f, 'reset')
except Exception as e:
_log('failed to remove %s: %s' % (f, e), 'error')
_log('state reset complete', 'reset')
return True
def format_duration(seconds):
"""Format seconds into compact human-readable duration."""
if seconds < 60:
return '%ds' % seconds
elif seconds < 3600:
return '%dm' % (seconds // 60)
elif seconds < 86400:
h, m = divmod(seconds, 3600)
m = m // 60
return '%dh %dm' % (h, m) if m else '%dh' % h
else:
d, rem = divmod(seconds, 86400)
h = rem // 3600
return '%dd %dh' % (d, h) if h else '%dd' % d
def import_from_file(fn, urldb):
"""Import URLs from a text file into the database."""
try:
with open(fn, 'r') as f:
urls = [url.strip() for url in f if url.strip()]
except IOError:
return # File not found, silently skip
for i in range(0, len(urls), 200):
chunk = urls[i:i+200]
if chunk:
dbs.insert_urls(chunk, 'import.txt', urldb)
def get_content_type(url, proxy):
hdr = fetch.fetch_contents(url, head=True, proxy=proxy)
for h in hdr.split('\n'):
if h.lower().startswith('content-type: '): return h.lower().split(':')[1].strip()
return ''
def is_good_content_type(string):
allowed_ct = [ 'text/html', 'text/plain', 'atom+xml' ]
for ct in allowed_ct:
if ct.lower() in string.lower(): return True
return False
def import_proxies_from_file(proxydb, fn):
content = open(fn, 'r').read()
# Detect protocol from filename (e.g., socks5.txt, http-proxies.txt)
proto = fetch.detect_proto_from_path(fn)
unique_count, new = fetch.extract_proxies(content, proxydb, proto=proto)
if new:
dbs.insert_proxies(proxydb, new, fn)
return 0
return 1
class Leechered(threading.Thread):
def __init__(self, url, stale_count, error, retrievals, proxies_added, content_type, content_hash, proxy):
self.status = 'nok'
self.proxylist = []
self.running = True
self.url = url
self.stale_count = stale_count
self.error = error
self.retrievals = retrievals
self.proxies_added = proxies_added
self.content_type = content_type
self.content_hash = content_hash
self.new_hash = None
self.hash_unchanged = False
self.proxy = proxy
self.execute = ''
threading.Thread.__init__(self)
def retrieve(self):
return self.url, self.proxylist, self.stale_count, self.error, self.retrievals, self.content_type, self.proxies_added, self.execute
def run(self):
self.status = 'nok'
try:
if not self.content_type: self.content_type = get_content_type(self.url, self.proxy)
if is_good_content_type(self.content_type):
try:
content = fetch.fetch_contents(self.url, proxy=self.proxy)
except KeyboardInterrupt as e:
raise e
except Exception as e:
try:
err_msg = repr(e)
if isinstance(err_msg, unicode):
err_msg = err_msg.encode('ascii', 'backslashreplace')
except:
err_msg = type(e).__name__
_log('%s: fetch error: %s' % (self.url.split('/')[2], err_msg), 'error')
content = ''
else:
content = ''
# Detect protocol from source URL (e.g., .../socks5/list.txt)
proto = fetch.detect_proto_from_path(self.url)
unique = fetch.extract_proxies(content, filter_known=False, proto=proto)
# Compute hash of all extracted proxies for change detection
self.new_hash = dbs.compute_proxy_list_hash(unique)
# Check if content unchanged (same proxies as last time)
if self.new_hash and self.content_hash and self.new_hash == self.content_hash:
self.hash_unchanged = True
self.proxylist = []
self.stale_count += 1
next_check = config.ppf.checktime + (self.error + self.stale_count) * config.ppf.perfail_checktime
_log('%s: unchanged (hash match), next in %s' % (self.url.split('/')[2], format_duration(next_check)), 'stale')
# Content unchanged - increment stale_count, update check_time
self.execute = (self.error, self.stale_count, int(time.time()), self.retrievals, self.proxies_added, self.content_type, self.url)
self.status = 'ok'
return
# Content changed or first fetch - reset stale_count, proceed with normal processing
self.stale_count = 0
# unique is list of (address, proto, confidence) tuples; filter by address, keep tuple
self.proxylist = [(addr, pr, conf) for addr, pr, conf in unique if not fetch.is_known_proxy(addr)]
proxy_count = len(self.proxylist)
if self.retrievals == 0: # new site
if content and not self.proxylist: # site works but has zero proxy addresses
self.error += 1
self.stale_count += 1
elif proxy_count:
self.error = 0
self.stale_count = 0
else:
self.error += 2
self.stale_count += 2
else: # not a new site
# proxylist is empty
if not proxy_count:
self.stale_count += 1
# proxylist is not empty: site is working
else:
self.stale_count = 0
self.error = 0
# site has no content
if not content:
self.error += 1
self.stale_count += 1
# site has proxies
if proxy_count:
self.error = 0
self.stale_count = 0
self.execute = (self.error, self.stale_count, int(time.time()), self.retrievals, self.proxies_added+len(self.proxylist), self.content_type, self.url)
self.status = 'ok'
except KeyboardInterrupt:
raise
except Exception as e:
try:
host = self.url.split('/')[2] if '/' in self.url else self.url
err_msg = repr(e)
if isinstance(err_msg, unicode):
err_msg = err_msg.encode('ascii', 'backslashreplace')
except:
host = 'unknown'
err_msg = type(e).__name__
_log('%s: thread error: %s' % (host, err_msg), 'error')
# Set error state so site gets retried later
self.error += 1
self.execute = (self.error, self.stale_count, int(time.time()), self.retrievals, self.proxies_added, self.content_type, self.url)
self.status = 'nok'
# Worker mode imports (lazy loaded)
try:
import urllib2
import socket
except ImportError:
urllib2 = None
def worker_register(server_url, name, master_key=''):
"""Register with master server and get credentials."""
url = server_url.rstrip('/') + '/api/register'
data = json.dumps({'name': name, 'master_key': master_key})
req = urllib2.Request(url, data)
req.add_header('Content-Type', 'application/json')
try:
resp = urllib2.urlopen(req, timeout=30)
result = json.loads(resp.read())
return result.get('worker_id'), result.get('worker_key')
except Exception as e:
_log('registration failed: %s' % e, 'error')
return None, None
class NeedReregister(Exception):
"""Raised when worker key is invalid and re-registration is needed."""
pass
def worker_get_work(server_url, worker_key, count=100):
"""Fetch batch of proxies from master."""
url = '%s/api/work?key=%s&count=%d' % (server_url.rstrip('/'), worker_key, count)
try:
resp = urllib2.urlopen(url, timeout=30)
result = json.loads(resp.read())
return result.get('proxies', [])
except urllib2.HTTPError as e:
if e.code == 403:
_log('worker key rejected (403), need to re-register', 'warn')
raise NeedReregister()
_log('failed to get work: %s' % e, 'error')
return []
except Exception as e:
_log('failed to get work: %s' % e, 'error')
return []
def worker_submit_results(server_url, worker_key, results):
"""Submit test results to master."""
url = '%s/api/results?key=%s' % (server_url.rstrip('/'), worker_key)
data = json.dumps({'results': results})
req = urllib2.Request(url, data)
req.add_header('Content-Type', 'application/json')
try:
resp = urllib2.urlopen(req, timeout=30)
result = json.loads(resp.read())
return result.get('processed', 0)
except urllib2.HTTPError as e:
if e.code == 403:
_log('worker key rejected (403), need to re-register', 'warn')
raise NeedReregister()
_log('failed to submit results: %s' % e, 'error')
return 0
except Exception as e:
_log('failed to submit results: %s' % e, 'error')
return 0
def worker_send_heartbeat(server_url, worker_key, tor_ok, tor_ip=None, profiling=False, threads=0):
"""Send heartbeat with Tor status to master."""
url = '%s/api/heartbeat?key=%s' % (server_url.rstrip('/'), worker_key)
data = json.dumps({
'tor_ok': tor_ok,
'tor_ip': tor_ip,
'profiling': profiling,
'threads': threads,
})
req = urllib2.Request(url, data)
req.add_header('Content-Type', 'application/json')
try:
resp = urllib2.urlopen(req, timeout=10)
return True
except urllib2.HTTPError as e:
if e.code == 403:
raise NeedReregister()
return False
except Exception:
return False
def worker_claim_urls(server_url, worker_key, count=5):
"""Claim batch of URLs for V2 worker mode."""
url = '%s/api/claim-urls?key=%s&count=%d' % (server_url.rstrip('/'), worker_key, count)
try:
resp = urllib2.urlopen(url, timeout=30)
result = json.loads(resp.read())
return result.get('urls', [])
except urllib2.HTTPError as e:
if e.code == 403:
_log('worker key rejected (403), need to re-register', 'warn')
raise NeedReregister()
_log('failed to claim urls: %s' % e, 'error')
return []
except Exception as e:
_log('failed to claim urls: %s' % e, 'error')
return []
def worker_report_urls(server_url, worker_key, reports):
"""Report URL fetch results to master."""
url = '%s/api/report-urls?key=%s' % (server_url.rstrip('/'), worker_key)
data = json.dumps({'reports': reports})
req = urllib2.Request(url, data)
req.add_header('Content-Type', 'application/json')
try:
resp = urllib2.urlopen(req, timeout=30)
result = json.loads(resp.read())
return result.get('processed', 0)
except urllib2.HTTPError as e:
if e.code == 403:
_log('worker key rejected (403), need to re-register', 'warn')
raise NeedReregister()
_log('failed to report urls: %s' % e, 'error')
return 0
except Exception as e:
_log('failed to report urls: %s' % e, 'error')
return 0
def worker_report_proxies(server_url, worker_key, proxies):
"""Report working proxies to master."""
url = '%s/api/report-proxies?key=%s' % (server_url.rstrip('/'), worker_key)
data = json.dumps({'proxies': proxies})
req = urllib2.Request(url, data)
req.add_header('Content-Type', 'application/json')
try:
resp = urllib2.urlopen(req, timeout=30)
result = json.loads(resp.read())
return result.get('processed', 0)
except urllib2.HTTPError as e:
if e.code == 403:
_log('worker key rejected (403), need to re-register', 'warn')
raise NeedReregister()
_log('failed to report proxies: %s' % e, 'error')
return 0
except Exception as e:
_log('failed to report proxies: %s' % e, 'error')
return 0
def check_tor_connectivity(tor_hosts):
"""Test Tor connectivity. Returns (working_hosts, tor_ip)."""
import socket
import socks
working = []
tor_ip = None
for tor_host in tor_hosts:
host, port = tor_host.split(':')
port = int(port)
try:
test_sock = socks.socksocket()
test_sock.set_proxy(socks.SOCKS5, host, port)
test_sock.settimeout(15)
test_sock.connect(('check.torproject.org', 80))
test_sock.send(b'GET /api/ip HTTP/1.0\r\nHost: check.torproject.org\r\n\r\n')
resp = test_sock.recv(1024)
test_sock.close()
if resp and b'HTTP/' in resp:
working.append(tor_host)
# Extract IP from JSON response body
if b'\r\n\r\n' in resp:
body = resp.split(b'\r\n\r\n', 1)[1]
try:
data = json.loads(body)
tor_ip = data.get('IP')
except Exception:
pass
except Exception:
pass
return working, tor_ip
def worker_main(config):
"""Worker mode main loop - uses proxywatchd multi-threaded testing."""
import json
global urllib2
try:
import Queue
except ImportError:
import queue as Queue
# Import proxywatchd for multi-threaded testing (gevent already patched at top)
import proxywatchd
proxywatchd.set_config(config)
server_url = config.args.server
if not server_url:
_log('--server URL required for worker mode', 'error')
sys.exit(1)
worker_key = config.args.worker_key
worker_name = config.args.worker_name or os.uname()[1]
batch_size = config.worker.batch_size
num_threads = config.watchd.threads
worker_id = None
# Register if --register flag or no key provided
if config.args.register or not worker_key:
_log('registering with master: %s' % server_url, 'info')
worker_id, worker_key = worker_register(server_url, worker_name)
if not worker_key:
_log('registration failed, exiting', 'error')
sys.exit(1)
_log('registered as %s (id: %s)' % (worker_name, worker_id), 'info')
_log('worker key: %s' % worker_key, 'info')
_log('save this key with --worker-key for future runs', 'info')
if config.args.register:
# Just register and exit
return
_log('starting worker mode', 'info')
_log(' server: %s' % server_url, 'info')
_log(' threads: %d' % num_threads, 'info')
_log(' batch size: %d' % batch_size, 'info')
_log(' tor hosts: %s' % config.common.tor_hosts, 'info')
# Verify Tor connectivity before claiming work
import socket
import socks
working_tor_hosts = []
for tor_host in config.torhosts:
host, port = tor_host.split(':')
port = int(port)
try:
# Test SOCKS connection
test_sock = socks.socksocket()
test_sock.set_proxy(socks.SOCKS5, host, port)
test_sock.settimeout(10)
test_sock.connect(('check.torproject.org', 80))
test_sock.send(b'GET / HTTP/1.0\r\nHost: check.torproject.org\r\n\r\n')
resp = test_sock.recv(512)
test_sock.close()
# Accept any HTTP response (200, 301, 302, etc.)
if resp and (b'HTTP/' in resp or len(resp) > 0):
status = resp.split(b'\r\n')[0] if b'\r\n' in resp else resp[:50]
_log('tor host %s:%d OK (%s)' % (host, port, status), 'info')
working_tor_hosts.append(tor_host)
else:
_log('tor host %s:%d no response (recv=%d bytes)' % (host, port, len(resp) if resp else 0), 'warn')
except Exception as e:
_log('tor host %s:%d failed: %s' % (host, port, e), 'warn')
if not working_tor_hosts:
_log('no working Tor hosts, cannot start worker', 'error')
sys.exit(1)
_log('%d/%d Tor hosts verified' % (len(working_tor_hosts), len(config.torhosts)), 'info')
# Create shared queues for worker threads
job_queue = proxywatchd.PriorityJobQueue()
completion_queue = Queue.Queue()
# Spawn worker threads with stagger to avoid overwhelming Tor
threads = []
for i in range(num_threads):
wt = proxywatchd.WorkerThread('w%d' % i, job_queue)
wt.start_thread()
threads.append(wt)
time.sleep(random.random() / 10) # 0-100ms stagger per thread
_log('spawned %d worker threads' % len(threads), 'info')
jobs_completed = 0
proxies_tested = 0
start_time = time.time()
current_tor_ip = None
consecutive_tor_failures = 0
worker_profiling = config.args.profile or config.common.profiling
# Use dict to allow mutation in nested function (Python 2 compatible)
wstate = {'worker_key': worker_key, 'worker_id': worker_id, 'backoff': 10}
def do_register():
"""Register with master, with exponential backoff on failure."""
while True:
_log('registering with master: %s' % server_url, 'info')
new_id, new_key = worker_register(server_url, worker_name)
if new_key:
wstate['worker_id'] = new_id
wstate['worker_key'] = new_key
wstate['backoff'] = 10 # Reset backoff on success
_log('registered as %s (id: %s)' % (worker_name, new_id), 'info')
return True
else:
_log('registration failed, retrying in %ds' % wstate['backoff'], 'warn')
time.sleep(wstate['backoff'])
wstate['backoff'] = min(wstate['backoff'] * 2, 300) # Max 5 min backoff
def wait_for_tor():
"""Wait for Tor to become available, checking every 30 seconds."""
check_interval = 30
while True:
working, tor_ip = check_tor_connectivity(config.torhosts)
if working:
_log('tor recovered: %s (%s)' % (working[0], tor_ip or 'unknown'), 'info')
# Send heartbeat to manager
try:
worker_send_heartbeat(server_url, wstate['worker_key'], True, tor_ip, worker_profiling, num_threads)
except NeedReregister:
do_register()
return working, tor_ip
_log('tor still down, retrying in %ds' % check_interval, 'warn')
# Send heartbeat with tor_ok=False
try:
worker_send_heartbeat(server_url, wstate['worker_key'], False, None, worker_profiling, num_threads)
except NeedReregister:
do_register()
time.sleep(check_interval)
try:
while True:
# Tor check before claiming work - don't claim if Tor is down
working, tor_ip = check_tor_connectivity(config.torhosts)
if not working:
consecutive_tor_failures += 1
_log('tor down before claiming work (consecutive: %d)' % consecutive_tor_failures, 'warn')
try:
worker_send_heartbeat(server_url, wstate['worker_key'], False, None, worker_profiling, num_threads)
except NeedReregister:
do_register()
if consecutive_tor_failures >= 2:
_log('tor appears down, waiting before claiming work', 'error')
working, current_tor_ip = wait_for_tor()
consecutive_tor_failures = 0
else:
time.sleep(10)
continue
else:
consecutive_tor_failures = 0
if tor_ip != current_tor_ip:
if current_tor_ip:
_log('tor circuit rotated: %s' % tor_ip, 'info')
current_tor_ip = tor_ip
# Send heartbeat to manager
try:
worker_send_heartbeat(server_url, wstate['worker_key'], True, tor_ip, worker_profiling, num_threads)
except NeedReregister:
do_register()
# Get work from master
try:
proxies = worker_get_work(server_url, wstate['worker_key'], batch_size)
except NeedReregister:
do_register()
continue
if not proxies:
_log('no work available, sleeping 30s', 'info')
time.sleep(30)
continue
_log('received %d proxies to test' % len(proxies), 'info')
# Create ProxyTestState and jobs for each proxy
pending_states = {}
all_jobs = []
# Get checktype(s) from config
checktypes = config.watchd.checktypes
for proxy_info in proxies:
ip = proxy_info['ip']
port = proxy_info['port']
proto = proxy_info.get('proto', 'http')
failed = proxy_info.get('failed', 0)
source_proto = proxy_info.get('source_proto')
proxy_str = '%s:%d' % (ip, port)
# Create state for this proxy
state = proxywatchd.ProxyTestState(
ip, port, proto, failed,
success_count=0, total_duration=0.0,
country=None, mitm=0, consecutive_success=0,
asn=None, oldies=False,
completion_queue=completion_queue,
proxy_full=proxy_str, source_proto=source_proto
)
pending_states[proxy_str] = state
# Select random checktype
checktype = random.choice(checktypes)
# Get target for this checktype
if checktype == 'judges':
available = proxywatchd.judge_stats.get_available_judges(
list(proxywatchd.judges.keys()))
target = random.choice(available) if available else random.choice(
list(proxywatchd.judges.keys()))
elif checktype == 'ssl':
target = random.choice(proxywatchd.ssl_targets)
elif checktype == 'irc':
target = random.choice(config.servers) if config.servers else 'irc.libera.chat:6667'
else: # head
target = random.choice(list(proxywatchd.regexes.keys()))
job = proxywatchd.TargetTestJob(state, target, checktype)
all_jobs.append(job)
# Shuffle and queue jobs
random.shuffle(all_jobs)
for job in all_jobs:
job_queue.put(job, priority=0)
# Wait for all jobs to complete
completed = 0
results = []
timeout_start = time.time()
timeout_seconds = config.watchd.timeout * 2 + 30 # generous timeout
while completed < len(proxies):
try:
state = completion_queue.get(timeout=1)
completed += 1
# Build result from state (failcount == 0 means success)
is_working = state.failcount == 0
latency_sec = (state.last_latency_ms / 1000.0) if state.last_latency_ms else 0
result = {
'ip': state.ip,
'port': state.port,
'proto': state.proto,
'working': is_working,
'latency': round(latency_sec, 3) if is_working else 0,
'error': None if is_working else 'failed',
}
results.append(result)
# Progress logging
if completed % 20 == 0 or completed == len(proxies):
working = sum(1 for r in results if r.get('working'))
_log('tested %d/%d proxies (%d working)' % (
completed, len(proxies), working), 'info')
except Queue.Empty:
if time.time() - timeout_start > timeout_seconds:
_log('batch timeout, %d/%d completed' % (completed, len(proxies)), 'warn')
break
continue
# Submit results
try:
processed = worker_submit_results(server_url, wstate['worker_key'], results)
except NeedReregister:
do_register()
# Retry submission with new key
try:
processed = worker_submit_results(server_url, wstate['worker_key'], results)
except NeedReregister:
_log('still rejected after re-register, discarding batch', 'error')
processed = 0
jobs_completed += 1
proxies_tested += len(results)
working = sum(1 for r in results if r.get('working'))
_log('batch %d: %d/%d working, submitted %d' % (
jobs_completed, working, len(results), processed), 'info')
# Brief pause between batches
time.sleep(1)
except KeyboardInterrupt:
elapsed = time.time() - start_time
_log('worker stopping...', 'info')
# Stop threads
for wt in threads:
wt.stop()
for wt in threads:
wt.term()
_log('worker stopped after %s' % format_duration(int(elapsed)), 'info')
_log(' jobs completed: %d' % jobs_completed, 'info')
_log(' proxies tested: %d' % proxies_tested, 'info')
def worker_v2_main(config):
"""V2 worker mode -- URL-driven discovery.
Claims URLs from master, fetches through Tor, extracts and tests proxies,
reports working proxies back to master.
"""
import json
global urllib2
try:
import Queue
except ImportError:
import queue as Queue
import proxywatchd
proxywatchd.set_config(config)
server_url = config.args.server
if not server_url:
_log('--server URL required for worker mode', 'error')
sys.exit(1)
worker_key = config.args.worker_key
worker_name = config.args.worker_name or os.uname()[1]
num_threads = config.watchd.threads
url_batch_size = config.worker.url_batch_size
worker_id = None
# Register if --register flag or no key provided
if config.args.register or not worker_key:
_log('registering with master: %s' % server_url, 'info')
worker_id, worker_key = worker_register(server_url, worker_name)
if not worker_key:
_log('registration failed, exiting', 'error')
sys.exit(1)
_log('registered as %s (id: %s)' % (worker_name, worker_id), 'info')
_log('worker key: %s' % worker_key, 'info')
_log('save this key with --worker-key for future runs', 'info')
if config.args.register:
return
_log('starting worker V2 mode (URL-driven)', 'info')
_log(' server: %s' % server_url, 'info')
_log(' threads: %d' % num_threads, 'info')
_log(' url batch: %d' % url_batch_size, 'info')
_log(' tor hosts: %s' % config.common.tor_hosts, 'info')
# Verify Tor connectivity before starting
import socks
working_tor_hosts = []
for tor_host in config.torhosts:
host, port = tor_host.split(':')
port = int(port)
try:
test_sock = socks.socksocket()
test_sock.set_proxy(socks.SOCKS5, host, port)
test_sock.settimeout(10)
test_sock.connect(('check.torproject.org', 80))
test_sock.send(b'GET / HTTP/1.0\r\nHost: check.torproject.org\r\n\r\n')
resp = test_sock.recv(512)
test_sock.close()
if resp and (b'HTTP/' in resp or len(resp) > 0):
status = resp.split(b'\r\n')[0] if b'\r\n' in resp else resp[:50]
_log('tor host %s:%d OK (%s)' % (host, port, status), 'info')
working_tor_hosts.append(tor_host)
else:
_log('tor host %s:%d no response' % (host, port), 'warn')
except Exception as e:
_log('tor host %s:%d failed: %s' % (host, port, e), 'warn')
if not working_tor_hosts:
_log('no working Tor hosts, cannot start worker', 'error')
sys.exit(1)
_log('%d/%d Tor hosts verified' % (len(working_tor_hosts), len(config.torhosts)), 'info')
# Create shared queues for worker threads
job_queue = proxywatchd.PriorityJobQueue()
completion_queue = Queue.Queue()
# Spawn worker threads
threads = []
for i in range(num_threads):
wt = proxywatchd.WorkerThread('w%d' % i, job_queue)
wt.start_thread()
threads.append(wt)
time.sleep(random.random() / 10)
_log('spawned %d worker threads' % len(threads), 'info')
# Session for fetching URLs through Tor
session = fetch.FetchSession()
cycles = 0
urls_fetched = 0
proxies_found = 0
proxies_working = 0
start_time = time.time()
current_tor_ip = None
consecutive_tor_failures = 0
worker_profiling = config.args.profile or config.common.profiling
wstate = {'worker_key': worker_key, 'worker_id': worker_id, 'backoff': 10}
def do_register():
"""Register with master, with exponential backoff on failure."""
while True:
_log('registering with master: %s' % server_url, 'info')
new_id, new_key = worker_register(server_url, worker_name)
if new_key:
wstate['worker_id'] = new_id
wstate['worker_key'] = new_key
wstate['backoff'] = 10
_log('registered as %s (id: %s)' % (worker_name, new_id), 'info')
return True
else:
_log('registration failed, retrying in %ds' % wstate['backoff'], 'warn')
time.sleep(wstate['backoff'])
wstate['backoff'] = min(wstate['backoff'] * 2, 300)
def wait_for_tor():
"""Wait for Tor to become available, checking every 30 seconds."""
check_interval = 30
while True:
working, tor_ip = check_tor_connectivity(config.torhosts)
if working:
_log('tor recovered: %s (%s)' % (working[0], tor_ip or 'unknown'), 'info')
try:
worker_send_heartbeat(server_url, wstate['worker_key'], True, tor_ip, worker_profiling, num_threads)
except NeedReregister:
do_register()
return working, tor_ip
_log('tor still down, retrying in %ds' % check_interval, 'warn')
try:
worker_send_heartbeat(server_url, wstate['worker_key'], False, None, worker_profiling, num_threads)
except NeedReregister:
do_register()
time.sleep(check_interval)
try:
while True:
# Tor connectivity check
working, tor_ip = check_tor_connectivity(config.torhosts)
if not working:
consecutive_tor_failures += 1
_log('tor down before claiming URLs (consecutive: %d)' % consecutive_tor_failures, 'warn')
try:
worker_send_heartbeat(server_url, wstate['worker_key'], False, None, worker_profiling, num_threads)
except NeedReregister:
do_register()
if consecutive_tor_failures >= 2:
_log('tor appears down, waiting before claiming URLs', 'error')
working, current_tor_ip = wait_for_tor()
consecutive_tor_failures = 0
else:
time.sleep(10)
continue
else:
consecutive_tor_failures = 0
if tor_ip != current_tor_ip:
if current_tor_ip:
_log('tor circuit rotated: %s' % tor_ip, 'info')
current_tor_ip = tor_ip
try:
worker_send_heartbeat(server_url, wstate['worker_key'], True, tor_ip, worker_profiling, num_threads)
except NeedReregister:
do_register()
# Claim URLs from master
try:
url_infos = worker_claim_urls(server_url, wstate['worker_key'], url_batch_size)
except NeedReregister:
do_register()
continue
if not url_infos:
_log('no URLs available, sleeping 30s', 'info')
time.sleep(30)
continue
_log('claimed %d URLs to process' % len(url_infos), 'info')
# Phase 1: Fetch URLs and extract proxies
url_reports = []
all_extracted = [] # list of (addr, proto, confidence, source_url)
for url_info in url_infos:
url = url_info.get('url', '')
last_hash = url_info.get('last_hash')
proto_hint = url_info.get('proto_hint')
fetch_start = time.time()
try:
content = session.fetch(url)
except Exception as e:
_log('%s: fetch error: %s' % (url.split('/')[2] if '/' in url else url, e), 'error')
content = None
fetch_time_ms = int((time.time() - fetch_start) * 1000)
urls_fetched += 1
if not content:
url_reports.append({
'url': url,
'success': False,
'content_hash': None,
'proxy_count': 0,
'fetch_time_ms': fetch_time_ms,
'changed': False,
'error': 'fetch failed',
})
continue
# Detect protocol from URL path
proto = fetch.detect_proto_from_path(url) or proto_hint
# Extract proxies (no filter_known -- workers have no proxydb)
extracted = fetch.extract_proxies(content, filter_known=False, proto=proto)
# Compute hash of extracted proxy list
content_hash = dbs.compute_proxy_list_hash(extracted)
if content_hash and last_hash and content_hash == last_hash:
# Content unchanged
url_reports.append({
'url': url,
'success': True,
'content_hash': content_hash,
'proxy_count': len(extracted),
'fetch_time_ms': fetch_time_ms,
'changed': False,
'error': None,
})
host = url.split('/')[2] if '/' in url else url
_log('%s: unchanged (%d proxies, hash match)' % (host, len(extracted)), 'stale')
continue
# Content changed or first fetch
for addr, pr, conf in extracted:
all_extracted.append((addr, pr, conf, url))
url_reports.append({
'url': url,
'success': True,
'content_hash': content_hash,
'proxy_count': len(extracted),
'fetch_time_ms': fetch_time_ms,
'changed': True,
'error': None,
})
host = url.split('/')[2] if '/' in url else url
_log('%s: %d proxies extracted' % (host, len(extracted)), 'info')
# Report URL health to master
if url_reports:
try:
worker_report_urls(server_url, wstate['worker_key'], url_reports)
except NeedReregister:
do_register()
try:
worker_report_urls(server_url, wstate['worker_key'], url_reports)
except NeedReregister:
_log('still rejected after re-register, discarding url reports', 'error')
# Deduplicate extracted proxies by address
seen = set()
unique_proxies = []
source_map = {} # addr -> first source_url
for addr, pr, conf, source_url in all_extracted:
if addr not in seen:
seen.add(addr)
unique_proxies.append((addr, pr, conf))
source_map[addr] = source_url
proxies_found += len(unique_proxies)
if not unique_proxies:
cycles += 1
time.sleep(1)
continue
_log('testing %d unique proxies' % len(unique_proxies), 'info')
# Phase 2: Test extracted proxies using worker thread pool
pending_states = {}
all_jobs = []
checktypes = config.watchd.checktypes
for addr, pr, conf in unique_proxies:
# Parse ip:port from addr (may contain auth: user:pass@ip:port)
addr_part = addr.split('@')[-1] if '@' in addr else addr
# Handle IPv6 [ipv6]:port
if addr_part.startswith('['):
bracket_end = addr_part.index(']')
ip = addr_part[1:bracket_end]
port = int(addr_part[bracket_end+2:])
else:
ip, port_str = addr_part.rsplit(':', 1)
port = int(port_str)
proto = pr or 'http'
proxy_str = '%s:%d' % (ip, port)
state = proxywatchd.ProxyTestState(
ip, port, proto, 0,
success_count=0, total_duration=0.0,
country=None, mitm=0, consecutive_success=0,
asn=None, oldies=False,
completion_queue=completion_queue,
proxy_full=addr, source_proto=pr
)
pending_states[proxy_str] = state
checktype = random.choice(checktypes)
if checktype == 'judges':
available = proxywatchd.judge_stats.get_available_judges(
list(proxywatchd.judges.keys()))
target = random.choice(available) if available else random.choice(
list(proxywatchd.judges.keys()))
elif checktype == 'ssl':
target = random.choice(proxywatchd.ssl_targets)
elif checktype == 'irc':
target = random.choice(config.servers) if config.servers else 'irc.libera.chat:6667'
else: # head
target = random.choice(list(proxywatchd.regexes.keys()))
job = proxywatchd.TargetTestJob(state, target, checktype)
all_jobs.append(job)
random.shuffle(all_jobs)
for job in all_jobs:
job_queue.put(job, priority=0)
# Wait for completion
completed = 0
timeout_start = time.time()
timeout_seconds = max(config.watchd.timeout * 2 + 30, len(all_jobs) * 0.5)
working_results = []
while completed < len(all_jobs):
try:
state = completion_queue.get(timeout=1)
completed += 1
if state.failcount == 0:
latency_sec = (state.last_latency_ms / 1000.0) if state.last_latency_ms else 0
proxy_addr = state.proxy
if state.auth:
proxy_addr = '%s@%s' % (state.auth, state.proxy)
working_results.append({
'ip': state.ip,
'port': state.port,
'proto': state.proto,
'source_proto': state.source_proto,
'latency': round(latency_sec, 3),
'exit_ip': state.exit_ip,
'source_url': source_map.get(proxy_addr) or source_map.get(state.proxy, ''),
'checktype': state.last_check or '',
'target': state.last_target or '',
})
if completed % 50 == 0 or completed == len(all_jobs):
_log('tested %d/%d proxies (%d working)' % (
completed, len(all_jobs), len(working_results)), 'info')
except Queue.Empty:
if time.time() - timeout_start > timeout_seconds:
_log('test timeout, %d/%d completed' % (completed, len(all_jobs)), 'warn')
break
continue
proxies_working += len(working_results)
# Report working proxies to master
if working_results:
try:
processed = worker_report_proxies(server_url, wstate['worker_key'], working_results)
except NeedReregister:
do_register()
try:
processed = worker_report_proxies(server_url, wstate['worker_key'], working_results)
except NeedReregister:
_log('still rejected after re-register, discarding proxy reports', 'error')
processed = 0
_log('reported %d working proxies (submitted %d)' % (len(working_results), processed), 'info')
cycles += 1
time.sleep(1)
except KeyboardInterrupt:
elapsed = time.time() - start_time
_log('worker V2 stopping...', 'info')
session.close()
for wt in threads:
wt.stop()
for wt in threads:
wt.term()
_log('worker V2 stopped after %s' % format_duration(int(elapsed)), 'info')
_log(' cycles: %d' % cycles, 'info')
_log(' urls fetched: %d' % urls_fetched, 'info')
_log(' proxies found: %d' % proxies_found, 'info')
_log(' proxies working: %d' % proxies_working, 'info')
def main():
"""Main entry point."""
global config
# Handle --reset flag before connecting to databases
if len(sys.argv) == 2 and sys.argv[1] == "--reset":
if reset_state():
_log('use without --reset to start fresh', 'reset')
sys.exit(0)
else:
sys.exit(1)
# V2 worker mode: URL-driven discovery
if config.args.worker_v2:
worker_v2_main(config)
return
# V1 worker mode: connect to master server instead of running locally
if config.args.worker or config.args.register:
worker_main(config)
return
proxydb = mysqlite.mysqlite(config.watchd.database, str)
dbs.create_table_if_not_exists(proxydb, 'proxylist')
fetch.init_known_proxies(proxydb)
with open('urignore.txt', 'r') as f:
urignore = [ i.strip() for i in f.read().split('\n') if i.strip() ]
urldb = mysqlite.mysqlite(config.ppf.database, str)
dbs.create_table_if_not_exists(urldb, 'uris')
dbs.seed_proxy_sources(urldb)
import_from_file('import.txt', urldb)
if len(sys.argv) == 3 and sys.argv[1] == "--file":
sys.exit(import_proxies_from_file(proxydb, sys.argv[2]))
# start proxy watcher (import here to avoid gevent dependency in worker mode)
import proxywatchd
httpd_server = None
if config.watchd.threads > 0:
watcherd = proxywatchd.Proxywatchd()
watcherd.start()
else:
watcherd = None
# Start httpd independently when watchd is disabled
if config.httpd.enabled:
from httpd import ProxyAPIServer, configure_url_scoring
import network_stats
configure_url_scoring(
config.ppf.checktime,
config.ppf.perfail_checktime,
config.ppf.max_fail,
config.ppf.list_max_age_days
)
def httpd_stats_provider():
"""Stats provider for httpd-only mode (scraping without testing)."""
stats = {
'network': network_stats.get_stats(),
}
# Add scraper stats if available
try:
import scraper
scraper_stats = scraper.get_scraper_stats()
if scraper_stats:
stats['scraper'] = scraper_stats
except Exception:
pass
# Add tor pool stats if available
try:
import connection_pool
pool = connection_pool.get_pool()
if pool:
stats['tor_pool'] = pool.get_stats()
except Exception:
pass
return stats
profiling = config.args.profile or config.common.profiling
httpd_server = ProxyAPIServer(
config.httpd.listenip,
config.httpd.port,
config.watchd.database,
stats_provider=httpd_stats_provider,
profiling=profiling,
url_database=config.ppf.database,
)
httpd_server.start()
# start scraper threads if enabled
scrapers = []
if config.scraper.enabled:
import scraper
for i in range(config.scraper.threads):
s = scraper.Scraper(config)
s.start()
scrapers.append(s)
_log('started %d scraper thread(s)' % len(scrapers), 'info')
# Filter out stale proxy list URLs:
# - URLs added more than list_max_age_days ago AND never produced proxies are skipped
# - URLs that have produced proxies before are always included (regardless of age)
qurl = '''SELECT url,stale_count,error,retrievals,proxies_added,content_type,content_hash
FROM uris
WHERE error < ?
AND (check_time+?+((error+stale_count)*?) < ?)
AND (added > ? OR proxies_added > 0)
ORDER BY RANDOM()'''
# Query to count skipped old URLs (for logging)
qurl_skipped = '''SELECT COUNT(*) FROM uris
WHERE error < ?
AND (check_time+?+((error+stale_count)*?) < ?)
AND added <= ?
AND proxies_added = 0'''
threads = []
rows = []
reqtime = time.time() - 3600
statusmsg = time.time()
list_max_age_seconds = config.ppf.list_max_age_days * 86400
last_skip_log = 0
while True:
try:
time.sleep(random.random()/10)
if (time.time() - statusmsg) > 180:
_log('running %d thread(s) over %d' % (len(threads), config.ppf.threads), 'ppf')
statusmsg = time.time()
if not rows:
if (time.time() - reqtime) > 3:
now = int(time.time())
min_added = now - list_max_age_seconds
rows = urldb.execute(qurl, (config.ppf.max_fail, config.ppf.checktime, config.ppf.perfail_checktime, now, min_added)).fetchall()
reqtime = time.time()
# Log skipped old URLs periodically (every 10 minutes)
if (time.time() - last_skip_log) > 600:
skipped = urldb.execute(qurl_skipped, (config.ppf.max_fail, config.ppf.checktime, config.ppf.perfail_checktime, now, min_added)).fetchone()
if skipped and skipped[0] > 0:
_log('skipping %d old proxy lists (added >%d days ago, no proxies found)' % (skipped[0], config.ppf.list_max_age_days), 'stale')
last_skip_log = time.time()
if len(rows) < config.ppf.threads:
time.sleep(60)
rows = []
else:
_log('handing %d job(s) to %d thread(s)' % ( len(rows), config.ppf.threads ), 'ppf')
for thread in threads:
if thread.status == 'ok':
url, proxylist, stale_count, error, retrievals, content_type, proxies_added, execute = thread.retrieve()
# proxylist is list of (address, proto, confidence) tuples
new = [(addr, pr, conf) for addr, pr, conf in proxylist if not fetch.is_known_proxy(addr)]
if new:
fetch.add_known_proxies([addr for addr, pr, conf in new])
# Update content_hash if we have a new one
new_hash = thread.new_hash
execute = (error, stale_count, int(time.time()), retrievals, proxies_added+len(new), content_type, new_hash, url)
urldb.execute('UPDATE uris SET error=?,stale_count=?,check_time=?,retrievals=?,proxies_added=?,content_type=?,content_hash=? where url=?', execute)
urldb.commit()
if new: dbs.insert_proxies(proxydb, new, url)
threads = [ thread for thread in threads if thread.is_alive() ]
if len(threads) < config.ppf.threads and rows:
# Only query proxydb when actually starting a new thread (reduces GIL blocking)
_proxylist = [ '%s://%s' % (p[0], p[1]) for p in proxydb.execute("SELECT proto,proxy from proxylist where failed=0 AND tested IS NOT NULL AND proto IN ('http','socks4','socks5')").fetchall() ]
if not _proxylist: _proxylist = None
p = random.sample(_proxylist, min(5, len(_proxylist))) if _proxylist else None
row = random.choice(rows)
urldb.execute('UPDATE uris SET check_time=? where url=?', (time.time(), row[0]))
urldb.commit()
rows.remove(row)
# row: url, stale_count, error, retrievals, proxies_added, content_type, content_hash
t = Leechered(row[0], row[1], row[2], row[3], row[4], row[5], row[6], p)
threads.append(t)
t.start()
except KeyboardInterrupt:
for s in scrapers:
s.stop()
if watcherd:
watcherd.stop()
watcherd.finish()
if httpd_server:
httpd_server.stop()
break
_log('ppf stopped', 'info')
if __name__ == '__main__':
config.load()
errors = config.validate()
if errors:
for e in errors:
_log(e, 'error')
sys.exit(1)
fetch.set_config(config)
# handle flags
if config.args.nobs:
set_nobs(True)
if config.args.profile or config.common.profiling:
_log('profiling enabled, output to data/profile.stats', 'info')
_profiler = cProfile.Profile()
try:
_profiler.enable()
main()
finally:
_profiler.disable()
_profiler.dump_stats('data/profile.stats')
_log('profile stats written to data/profile.stats', 'info')
stats = pstats.Stats('data/profile.stats')
stats.strip_dirs().sort_stats('cumulative').print_stats(20)
else:
main()