ppf: add worker mode with distributed testing
- Add --worker mode for distributed proxy testing - Workers claim batches from manager, test via local Tor, submit results - Add --register to register new workers with manager - Add thread spawn stagger (0-100ms) to avoid overwhelming Tor - Verify Tor connectivity before claiming work - Add heartbeat and batch timeout handling - Track worker profiling state for dashboard display
This commit is contained in:
502
ppf.py
502
ppf.py
@@ -1,20 +1,26 @@
|
|||||||
#!/usr/bin/env python2
|
#!/usr/bin/env python2
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
|
||||||
|
# Worker mode requires gevent - must monkey-patch before other imports
|
||||||
|
if '--worker' in sys.argv or '--register' in sys.argv:
|
||||||
|
from gevent import monkey
|
||||||
|
monkey.patch_all()
|
||||||
|
|
||||||
import cProfile
|
import cProfile
|
||||||
import pstats
|
import pstats
|
||||||
import signal
|
import signal
|
||||||
import dbs
|
import dbs
|
||||||
import time
|
import time
|
||||||
import mysqlite
|
import mysqlite
|
||||||
import proxywatchd
|
|
||||||
from misc import _log
|
from misc import _log
|
||||||
from config import Config
|
from config import Config
|
||||||
import fetch
|
import fetch
|
||||||
import sys
|
|
||||||
from soup_parser import set_nobs
|
from soup_parser import set_nobs
|
||||||
import threading
|
import threading
|
||||||
import random
|
import random
|
||||||
import os
|
import json
|
||||||
|
|
||||||
# Global profiler for signal handler access
|
# Global profiler for signal handler access
|
||||||
_profiler = None
|
_profiler = None
|
||||||
@@ -168,8 +174,6 @@ class Leechered(threading.Thread):
|
|||||||
|
|
||||||
def retrieve(self):
|
def retrieve(self):
|
||||||
return self.url, self.proxylist, self.stale_count, self.error, self.retrievals, self.content_type, self.proxies_added, self.execute
|
return self.url, self.proxylist, self.stale_count, self.error, self.retrievals, self.content_type, self.proxies_added, self.execute
|
||||||
def status(self):
|
|
||||||
return self.status
|
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self.status = 'nok'
|
self.status = 'nok'
|
||||||
@@ -267,6 +271,442 @@ class Leechered(threading.Thread):
|
|||||||
self.status = 'nok'
|
self.status = 'nok'
|
||||||
|
|
||||||
|
|
||||||
|
# Worker mode imports (lazy loaded)
|
||||||
|
try:
|
||||||
|
import urllib2
|
||||||
|
import socket
|
||||||
|
except ImportError:
|
||||||
|
urllib2 = None
|
||||||
|
|
||||||
|
|
||||||
|
def worker_register(server_url, name, master_key=''):
|
||||||
|
"""Register with master server and get credentials."""
|
||||||
|
url = server_url.rstrip('/') + '/api/register'
|
||||||
|
data = json.dumps({'name': name, 'master_key': master_key})
|
||||||
|
|
||||||
|
req = urllib2.Request(url, data)
|
||||||
|
req.add_header('Content-Type', 'application/json')
|
||||||
|
|
||||||
|
try:
|
||||||
|
resp = urllib2.urlopen(req, timeout=30)
|
||||||
|
result = json.loads(resp.read())
|
||||||
|
return result.get('worker_id'), result.get('worker_key')
|
||||||
|
except Exception as e:
|
||||||
|
_log('registration failed: %s' % e, 'error')
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
|
||||||
|
class NeedReregister(Exception):
|
||||||
|
"""Raised when worker key is invalid and re-registration is needed."""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def worker_get_work(server_url, worker_key, count=100):
|
||||||
|
"""Fetch batch of proxies from master."""
|
||||||
|
url = '%s/api/work?key=%s&count=%d' % (server_url.rstrip('/'), worker_key, count)
|
||||||
|
|
||||||
|
try:
|
||||||
|
resp = urllib2.urlopen(url, timeout=30)
|
||||||
|
result = json.loads(resp.read())
|
||||||
|
return result.get('proxies', [])
|
||||||
|
except urllib2.HTTPError as e:
|
||||||
|
if e.code == 403:
|
||||||
|
_log('worker key rejected (403), need to re-register', 'warn')
|
||||||
|
raise NeedReregister()
|
||||||
|
_log('failed to get work: %s' % e, 'error')
|
||||||
|
return []
|
||||||
|
except Exception as e:
|
||||||
|
_log('failed to get work: %s' % e, 'error')
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
def worker_submit_results(server_url, worker_key, results):
|
||||||
|
"""Submit test results to master."""
|
||||||
|
url = '%s/api/results?key=%s' % (server_url.rstrip('/'), worker_key)
|
||||||
|
data = json.dumps({'results': results})
|
||||||
|
|
||||||
|
req = urllib2.Request(url, data)
|
||||||
|
req.add_header('Content-Type', 'application/json')
|
||||||
|
|
||||||
|
try:
|
||||||
|
resp = urllib2.urlopen(req, timeout=30)
|
||||||
|
result = json.loads(resp.read())
|
||||||
|
return result.get('processed', 0)
|
||||||
|
except urllib2.HTTPError as e:
|
||||||
|
if e.code == 403:
|
||||||
|
_log('worker key rejected (403), need to re-register', 'warn')
|
||||||
|
raise NeedReregister()
|
||||||
|
_log('failed to submit results: %s' % e, 'error')
|
||||||
|
return 0
|
||||||
|
except Exception as e:
|
||||||
|
_log('failed to submit results: %s' % e, 'error')
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
def worker_send_heartbeat(server_url, worker_key, tor_ok, tor_ip=None, profiling=False):
|
||||||
|
"""Send heartbeat with Tor status to master."""
|
||||||
|
url = '%s/api/heartbeat?key=%s' % (server_url.rstrip('/'), worker_key)
|
||||||
|
data = json.dumps({
|
||||||
|
'tor_ok': tor_ok,
|
||||||
|
'tor_ip': tor_ip,
|
||||||
|
'profiling': profiling,
|
||||||
|
})
|
||||||
|
|
||||||
|
req = urllib2.Request(url, data)
|
||||||
|
req.add_header('Content-Type', 'application/json')
|
||||||
|
|
||||||
|
try:
|
||||||
|
resp = urllib2.urlopen(req, timeout=10)
|
||||||
|
return True
|
||||||
|
except urllib2.HTTPError as e:
|
||||||
|
if e.code == 403:
|
||||||
|
raise NeedReregister()
|
||||||
|
return False
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def check_tor_connectivity(tor_hosts):
|
||||||
|
"""Test Tor connectivity. Returns (working_hosts, tor_ip)."""
|
||||||
|
import socket
|
||||||
|
import socks
|
||||||
|
|
||||||
|
working = []
|
||||||
|
tor_ip = None
|
||||||
|
|
||||||
|
for tor_host in tor_hosts:
|
||||||
|
host, port = tor_host.split(':')
|
||||||
|
port = int(port)
|
||||||
|
try:
|
||||||
|
test_sock = socks.socksocket()
|
||||||
|
test_sock.set_proxy(socks.SOCKS5, host, port)
|
||||||
|
test_sock.settimeout(15)
|
||||||
|
test_sock.connect(('check.torproject.org', 80))
|
||||||
|
test_sock.send(b'GET /api/ip HTTP/1.0\r\nHost: check.torproject.org\r\n\r\n')
|
||||||
|
resp = test_sock.recv(1024)
|
||||||
|
test_sock.close()
|
||||||
|
|
||||||
|
if resp and b'HTTP/' in resp:
|
||||||
|
working.append(tor_host)
|
||||||
|
# Extract IP from JSON response body
|
||||||
|
if b'\r\n\r\n' in resp:
|
||||||
|
body = resp.split(b'\r\n\r\n', 1)[1]
|
||||||
|
try:
|
||||||
|
data = json.loads(body)
|
||||||
|
tor_ip = data.get('IP')
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return working, tor_ip
|
||||||
|
|
||||||
|
|
||||||
|
def worker_main(config):
|
||||||
|
"""Worker mode main loop - uses proxywatchd multi-threaded testing."""
|
||||||
|
import json
|
||||||
|
global urllib2
|
||||||
|
|
||||||
|
try:
|
||||||
|
import Queue
|
||||||
|
except ImportError:
|
||||||
|
import queue as Queue
|
||||||
|
|
||||||
|
# Import proxywatchd for multi-threaded testing (gevent already patched at top)
|
||||||
|
import proxywatchd
|
||||||
|
proxywatchd.set_config(config)
|
||||||
|
|
||||||
|
server_url = config.args.server
|
||||||
|
if not server_url:
|
||||||
|
_log('--server URL required for worker mode', 'error')
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
worker_key = config.args.worker_key
|
||||||
|
worker_name = config.args.worker_name or os.uname()[1]
|
||||||
|
batch_size = config.worker.batch_size
|
||||||
|
num_threads = config.watchd.threads
|
||||||
|
|
||||||
|
# Register if --register flag or no key provided
|
||||||
|
if config.args.register or not worker_key:
|
||||||
|
_log('registering with master: %s' % server_url, 'info')
|
||||||
|
worker_id, worker_key = worker_register(server_url, worker_name)
|
||||||
|
if not worker_key:
|
||||||
|
_log('registration failed, exiting', 'error')
|
||||||
|
sys.exit(1)
|
||||||
|
_log('registered as %s (id: %s)' % (worker_name, worker_id), 'info')
|
||||||
|
_log('worker key: %s' % worker_key, 'info')
|
||||||
|
_log('save this key with --worker-key for future runs', 'info')
|
||||||
|
|
||||||
|
if config.args.register:
|
||||||
|
# Just register and exit
|
||||||
|
return
|
||||||
|
|
||||||
|
_log('starting worker mode', 'info')
|
||||||
|
_log(' server: %s' % server_url, 'info')
|
||||||
|
_log(' threads: %d' % num_threads, 'info')
|
||||||
|
_log(' batch size: %d' % batch_size, 'info')
|
||||||
|
_log(' tor hosts: %s' % config.common.tor_hosts, 'info')
|
||||||
|
|
||||||
|
# Verify Tor connectivity before claiming work
|
||||||
|
import socket
|
||||||
|
import socks
|
||||||
|
working_tor_hosts = []
|
||||||
|
for tor_host in config.torhosts:
|
||||||
|
host, port = tor_host.split(':')
|
||||||
|
port = int(port)
|
||||||
|
try:
|
||||||
|
# Test SOCKS connection
|
||||||
|
test_sock = socks.socksocket()
|
||||||
|
test_sock.set_proxy(socks.SOCKS5, host, port)
|
||||||
|
test_sock.settimeout(10)
|
||||||
|
test_sock.connect(('check.torproject.org', 80))
|
||||||
|
test_sock.send(b'GET / HTTP/1.0\r\nHost: check.torproject.org\r\n\r\n')
|
||||||
|
resp = test_sock.recv(512)
|
||||||
|
test_sock.close()
|
||||||
|
# Accept any HTTP response (200, 301, 302, etc.)
|
||||||
|
if resp and (b'HTTP/' in resp or len(resp) > 0):
|
||||||
|
status = resp.split(b'\r\n')[0] if b'\r\n' in resp else resp[:50]
|
||||||
|
_log('tor host %s:%d OK (%s)' % (host, port, status), 'info')
|
||||||
|
working_tor_hosts.append(tor_host)
|
||||||
|
else:
|
||||||
|
_log('tor host %s:%d no response (recv=%d bytes)' % (host, port, len(resp) if resp else 0), 'warn')
|
||||||
|
except Exception as e:
|
||||||
|
_log('tor host %s:%d failed: %s' % (host, port, e), 'warn')
|
||||||
|
|
||||||
|
if not working_tor_hosts:
|
||||||
|
_log('no working Tor hosts, cannot start worker', 'error')
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
_log('%d/%d Tor hosts verified' % (len(working_tor_hosts), len(config.torhosts)), 'info')
|
||||||
|
|
||||||
|
# Create shared queues for worker threads
|
||||||
|
job_queue = proxywatchd.PriorityJobQueue()
|
||||||
|
result_queue = Queue.Queue()
|
||||||
|
completion_queue = Queue.Queue()
|
||||||
|
|
||||||
|
# Spawn worker threads with stagger to avoid overwhelming Tor
|
||||||
|
threads = []
|
||||||
|
for i in range(num_threads):
|
||||||
|
wt = proxywatchd.WorkerThread('w%d' % i, job_queue, result_queue)
|
||||||
|
wt.start_thread()
|
||||||
|
threads.append(wt)
|
||||||
|
time.sleep(random.random() / 10) # 0-100ms stagger per thread
|
||||||
|
|
||||||
|
_log('spawned %d worker threads' % len(threads), 'info')
|
||||||
|
|
||||||
|
jobs_completed = 0
|
||||||
|
proxies_tested = 0
|
||||||
|
start_time = time.time()
|
||||||
|
last_tor_check = time.time()
|
||||||
|
tor_check_interval = 300 # Check Tor every 5 minutes
|
||||||
|
current_tor_ip = None
|
||||||
|
consecutive_tor_failures = 0
|
||||||
|
worker_profiling = config.args.profile or config.common.profiling
|
||||||
|
# Use dict to allow mutation in nested function (Python 2 compatible)
|
||||||
|
wstate = {'worker_key': worker_key, 'worker_id': worker_id, 'backoff': 10}
|
||||||
|
|
||||||
|
def do_register():
|
||||||
|
"""Register with master, with exponential backoff on failure."""
|
||||||
|
while True:
|
||||||
|
_log('registering with master: %s' % server_url, 'info')
|
||||||
|
new_id, new_key = worker_register(server_url, worker_name)
|
||||||
|
if new_key:
|
||||||
|
wstate['worker_id'] = new_id
|
||||||
|
wstate['worker_key'] = new_key
|
||||||
|
wstate['backoff'] = 10 # Reset backoff on success
|
||||||
|
_log('registered as %s (id: %s)' % (worker_name, new_id), 'info')
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
_log('registration failed, retrying in %ds' % wstate['backoff'], 'warn')
|
||||||
|
time.sleep(wstate['backoff'])
|
||||||
|
wstate['backoff'] = min(wstate['backoff'] * 2, 300) # Max 5 min backoff
|
||||||
|
|
||||||
|
def wait_for_tor():
|
||||||
|
"""Wait for Tor to become available, with exponential backoff."""
|
||||||
|
backoff = 10
|
||||||
|
while True:
|
||||||
|
working, tor_ip = check_tor_connectivity(config.torhosts)
|
||||||
|
if working:
|
||||||
|
_log('tor recovered: %s (%s)' % (working[0], tor_ip or 'unknown'), 'info')
|
||||||
|
# Send heartbeat to manager
|
||||||
|
try:
|
||||||
|
worker_send_heartbeat(server_url, wstate['worker_key'], True, tor_ip, worker_profiling)
|
||||||
|
except NeedReregister:
|
||||||
|
do_register()
|
||||||
|
return working, tor_ip
|
||||||
|
_log('tor still down, retrying in %ds' % backoff, 'warn')
|
||||||
|
# Send heartbeat with tor_ok=False
|
||||||
|
try:
|
||||||
|
worker_send_heartbeat(server_url, wstate['worker_key'], False, None, worker_profiling)
|
||||||
|
except NeedReregister:
|
||||||
|
do_register()
|
||||||
|
time.sleep(backoff)
|
||||||
|
backoff = min(backoff * 2, 300) # Max 5 min backoff
|
||||||
|
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
# Periodic Tor health check
|
||||||
|
now = time.time()
|
||||||
|
if now - last_tor_check > tor_check_interval:
|
||||||
|
working, tor_ip = check_tor_connectivity(config.torhosts)
|
||||||
|
last_tor_check = now
|
||||||
|
if working:
|
||||||
|
consecutive_tor_failures = 0
|
||||||
|
if tor_ip != current_tor_ip:
|
||||||
|
_log('tor circuit rotated: %s' % tor_ip, 'info')
|
||||||
|
current_tor_ip = tor_ip
|
||||||
|
# Send periodic heartbeat
|
||||||
|
try:
|
||||||
|
worker_send_heartbeat(server_url, wstate['worker_key'], True, tor_ip, worker_profiling)
|
||||||
|
except NeedReregister:
|
||||||
|
do_register()
|
||||||
|
else:
|
||||||
|
consecutive_tor_failures += 1
|
||||||
|
_log('tor connectivity failed (consecutive: %d)' % consecutive_tor_failures, 'warn')
|
||||||
|
if consecutive_tor_failures >= 2:
|
||||||
|
_log('tor appears down, pausing work', 'error')
|
||||||
|
try:
|
||||||
|
worker_send_heartbeat(server_url, wstate['worker_key'], False, None, worker_profiling)
|
||||||
|
except NeedReregister:
|
||||||
|
do_register()
|
||||||
|
working, current_tor_ip = wait_for_tor()
|
||||||
|
consecutive_tor_failures = 0
|
||||||
|
last_tor_check = time.time()
|
||||||
|
|
||||||
|
# Get work from master
|
||||||
|
try:
|
||||||
|
proxies = worker_get_work(server_url, wstate['worker_key'], batch_size)
|
||||||
|
except NeedReregister:
|
||||||
|
do_register()
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not proxies:
|
||||||
|
_log('no work available, sleeping 30s', 'info')
|
||||||
|
time.sleep(30)
|
||||||
|
continue
|
||||||
|
|
||||||
|
_log('received %d proxies to test' % len(proxies), 'info')
|
||||||
|
|
||||||
|
# Create ProxyTestState and jobs for each proxy
|
||||||
|
pending_states = {}
|
||||||
|
all_jobs = []
|
||||||
|
|
||||||
|
# Get checktype(s) from config
|
||||||
|
checktypes = config.watchd.checktypes
|
||||||
|
|
||||||
|
for proxy_info in proxies:
|
||||||
|
ip = proxy_info['ip']
|
||||||
|
port = proxy_info['port']
|
||||||
|
proto = proxy_info.get('proto', 'http')
|
||||||
|
failed = proxy_info.get('failed', 0)
|
||||||
|
proxy_str = '%s:%d' % (ip, port)
|
||||||
|
|
||||||
|
# Create state for this proxy
|
||||||
|
state = proxywatchd.ProxyTestState(
|
||||||
|
ip, port, proto, failed,
|
||||||
|
success_count=0, total_duration=0.0,
|
||||||
|
country=None, mitm=0, consecutive_success=0,
|
||||||
|
asn=None, num_targets=1, oldies=False,
|
||||||
|
completion_queue=completion_queue,
|
||||||
|
proxy_full=proxy_str
|
||||||
|
)
|
||||||
|
pending_states[proxy_str] = state
|
||||||
|
|
||||||
|
# Select random checktype
|
||||||
|
checktype = random.choice(checktypes)
|
||||||
|
|
||||||
|
# Get target for this checktype
|
||||||
|
if checktype == 'judges':
|
||||||
|
available = proxywatchd.judge_stats.get_available_judges(
|
||||||
|
list(proxywatchd.judges.keys()))
|
||||||
|
target = random.choice(available) if available else random.choice(
|
||||||
|
list(proxywatchd.judges.keys()))
|
||||||
|
elif checktype == 'ssl':
|
||||||
|
target = random.choice(proxywatchd.ssl_targets)
|
||||||
|
elif checktype == 'irc':
|
||||||
|
target = random.choice(config.servers) if config.servers else 'irc.libera.chat:6667'
|
||||||
|
else: # head
|
||||||
|
target = random.choice(list(proxywatchd.regexes.keys()))
|
||||||
|
|
||||||
|
job = proxywatchd.TargetTestJob(state, target, checktype)
|
||||||
|
all_jobs.append(job)
|
||||||
|
|
||||||
|
# Shuffle and queue jobs
|
||||||
|
random.shuffle(all_jobs)
|
||||||
|
for job in all_jobs:
|
||||||
|
job_queue.put(job, priority=0)
|
||||||
|
|
||||||
|
# Wait for all jobs to complete
|
||||||
|
completed = 0
|
||||||
|
results = []
|
||||||
|
timeout_start = time.time()
|
||||||
|
timeout_seconds = config.watchd.timeout * 2 + 30 # generous timeout
|
||||||
|
|
||||||
|
while completed < len(proxies):
|
||||||
|
try:
|
||||||
|
state = completion_queue.get(timeout=1)
|
||||||
|
completed += 1
|
||||||
|
|
||||||
|
# Build result from state (failcount == 0 means success)
|
||||||
|
is_working = state.failcount == 0
|
||||||
|
latency_sec = (state.last_latency_ms / 1000.0) if state.last_latency_ms else 0
|
||||||
|
result = {
|
||||||
|
'ip': state.ip,
|
||||||
|
'port': state.port,
|
||||||
|
'proto': state.proto,
|
||||||
|
'working': is_working,
|
||||||
|
'latency': round(latency_sec, 3) if is_working else 0,
|
||||||
|
'error': None if is_working else 'failed',
|
||||||
|
}
|
||||||
|
results.append(result)
|
||||||
|
|
||||||
|
# Progress logging
|
||||||
|
if completed % 20 == 0 or completed == len(proxies):
|
||||||
|
working = sum(1 for r in results if r.get('working'))
|
||||||
|
_log('tested %d/%d proxies (%d working)' % (
|
||||||
|
completed, len(proxies), working), 'info')
|
||||||
|
|
||||||
|
except Queue.Empty:
|
||||||
|
if time.time() - timeout_start > timeout_seconds:
|
||||||
|
_log('batch timeout, %d/%d completed' % (completed, len(proxies)), 'warn')
|
||||||
|
break
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Submit results
|
||||||
|
try:
|
||||||
|
processed = worker_submit_results(server_url, wstate['worker_key'], results)
|
||||||
|
except NeedReregister:
|
||||||
|
do_register()
|
||||||
|
# Retry submission with new key
|
||||||
|
try:
|
||||||
|
processed = worker_submit_results(server_url, wstate['worker_key'], results)
|
||||||
|
except NeedReregister:
|
||||||
|
_log('still rejected after re-register, discarding batch', 'error')
|
||||||
|
processed = 0
|
||||||
|
|
||||||
|
jobs_completed += 1
|
||||||
|
proxies_tested += len(results)
|
||||||
|
|
||||||
|
working = sum(1 for r in results if r.get('working'))
|
||||||
|
_log('batch %d: %d/%d working, submitted %d' % (
|
||||||
|
jobs_completed, working, len(results), processed), 'info')
|
||||||
|
|
||||||
|
# Brief pause between batches
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
elapsed = time.time() - start_time
|
||||||
|
_log('worker stopping...', 'info')
|
||||||
|
# Stop threads
|
||||||
|
for wt in threads:
|
||||||
|
wt.stop()
|
||||||
|
for wt in threads:
|
||||||
|
wt.term()
|
||||||
|
_log('worker stopped after %s' % format_duration(int(elapsed)), 'info')
|
||||||
|
_log(' jobs completed: %d' % jobs_completed, 'info')
|
||||||
|
_log(' proxies tested: %d' % proxies_tested, 'info')
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
"""Main entry point."""
|
"""Main entry point."""
|
||||||
global config
|
global config
|
||||||
@@ -279,6 +719,11 @@ def main():
|
|||||||
else:
|
else:
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
|
# Worker mode: connect to master server instead of running locally
|
||||||
|
if config.args.worker or config.args.register:
|
||||||
|
worker_main(config)
|
||||||
|
return
|
||||||
|
|
||||||
proxydb = mysqlite.mysqlite(config.watchd.database, str)
|
proxydb = mysqlite.mysqlite(config.watchd.database, str)
|
||||||
dbs.create_table_if_not_exists(proxydb, 'proxylist')
|
dbs.create_table_if_not_exists(proxydb, 'proxylist')
|
||||||
fetch.init_known_proxies(proxydb)
|
fetch.init_known_proxies(proxydb)
|
||||||
@@ -293,12 +738,25 @@ def main():
|
|||||||
if len(sys.argv) == 3 and sys.argv[1] == "--file":
|
if len(sys.argv) == 3 and sys.argv[1] == "--file":
|
||||||
sys.exit(import_proxies_from_file(proxydb, sys.argv[2]))
|
sys.exit(import_proxies_from_file(proxydb, sys.argv[2]))
|
||||||
|
|
||||||
# start proxy watcher
|
# start proxy watcher (import here to avoid gevent dependency in worker mode)
|
||||||
|
import proxywatchd
|
||||||
|
httpd_server = None
|
||||||
if config.watchd.threads > 0:
|
if config.watchd.threads > 0:
|
||||||
watcherd = proxywatchd.Proxywatchd()
|
watcherd = proxywatchd.Proxywatchd()
|
||||||
watcherd.start()
|
watcherd.start()
|
||||||
else:
|
else:
|
||||||
watcherd = None
|
watcherd = None
|
||||||
|
# Start httpd independently when watchd is disabled
|
||||||
|
if config.httpd.enabled:
|
||||||
|
from httpd import ProxyAPIServer
|
||||||
|
profiling = config.args.profile or config.common.profiling
|
||||||
|
httpd_server = ProxyAPIServer(
|
||||||
|
config.httpd.listenip,
|
||||||
|
config.httpd.port,
|
||||||
|
config.watchd.database,
|
||||||
|
profiling=profiling,
|
||||||
|
)
|
||||||
|
httpd_server.start()
|
||||||
|
|
||||||
# start scraper threads if enabled
|
# start scraper threads if enabled
|
||||||
scrapers = []
|
scrapers = []
|
||||||
@@ -310,11 +768,27 @@ def main():
|
|||||||
scrapers.append(s)
|
scrapers.append(s)
|
||||||
_log('started %d scraper thread(s)' % len(scrapers), 'info')
|
_log('started %d scraper thread(s)' % len(scrapers), 'info')
|
||||||
|
|
||||||
qurl = 'SELECT url,stale_count,error,retrievals,proxies_added,content_type,content_hash FROM uris WHERE error < ? and (check_time+?+((error+stale_count)*?) <?) ORDER BY RANDOM()'
|
# Filter out stale proxy list URLs:
|
||||||
|
# - URLs added more than list_max_age_days ago AND never produced proxies are skipped
|
||||||
|
# - URLs that have produced proxies before are always included (regardless of age)
|
||||||
|
qurl = '''SELECT url,stale_count,error,retrievals,proxies_added,content_type,content_hash
|
||||||
|
FROM uris
|
||||||
|
WHERE error < ?
|
||||||
|
AND (check_time+?+((error+stale_count)*?) < ?)
|
||||||
|
AND (added > ? OR proxies_added > 0)
|
||||||
|
ORDER BY RANDOM()'''
|
||||||
|
# Query to count skipped old URLs (for logging)
|
||||||
|
qurl_skipped = '''SELECT COUNT(*) FROM uris
|
||||||
|
WHERE error < ?
|
||||||
|
AND (check_time+?+((error+stale_count)*?) < ?)
|
||||||
|
AND added <= ?
|
||||||
|
AND proxies_added = 0'''
|
||||||
threads = []
|
threads = []
|
||||||
rows = []
|
rows = []
|
||||||
reqtime = time.time() - 3600
|
reqtime = time.time() - 3600
|
||||||
statusmsg = time.time()
|
statusmsg = time.time()
|
||||||
|
list_max_age_seconds = config.ppf.list_max_age_days * 86400
|
||||||
|
last_skip_log = 0
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
time.sleep(random.random()/10)
|
time.sleep(random.random()/10)
|
||||||
@@ -323,15 +797,23 @@ def main():
|
|||||||
statusmsg = time.time()
|
statusmsg = time.time()
|
||||||
if not rows:
|
if not rows:
|
||||||
if (time.time() - reqtime) > 3:
|
if (time.time() - reqtime) > 3:
|
||||||
rows = urldb.execute(qurl, (config.ppf.max_fail, config.ppf.checktime, config.ppf.perfail_checktime, int(time.time()))).fetchall()
|
now = int(time.time())
|
||||||
|
min_added = now - list_max_age_seconds
|
||||||
|
rows = urldb.execute(qurl, (config.ppf.max_fail, config.ppf.checktime, config.ppf.perfail_checktime, now, min_added)).fetchall()
|
||||||
reqtime = time.time()
|
reqtime = time.time()
|
||||||
|
# Log skipped old URLs periodically (every 10 minutes)
|
||||||
|
if (time.time() - last_skip_log) > 600:
|
||||||
|
skipped = urldb.execute(qurl_skipped, (config.ppf.max_fail, config.ppf.checktime, config.ppf.perfail_checktime, now, min_added)).fetchone()
|
||||||
|
if skipped and skipped[0] > 0:
|
||||||
|
_log('skipping %d old proxy lists (added >%d days ago, no proxies found)' % (skipped[0], config.ppf.list_max_age_days), 'stale')
|
||||||
|
last_skip_log = time.time()
|
||||||
if len(rows) < config.ppf.threads:
|
if len(rows) < config.ppf.threads:
|
||||||
time.sleep(60)
|
time.sleep(60)
|
||||||
rows = []
|
rows = []
|
||||||
else:
|
else:
|
||||||
_log('handing %d job(s) to %d thread(s)' % ( len(rows), config.ppf.threads ), 'ppf')
|
_log('handing %d job(s) to %d thread(s)' % ( len(rows), config.ppf.threads ), 'ppf')
|
||||||
|
|
||||||
_proxylist = [ '%s://%s' % (p[0], p[1]) for p in proxydb.execute('SELECT proto,proxy from proxylist where failed=0').fetchall() ]
|
_proxylist = [ '%s://%s' % (p[0], p[1]) for p in proxydb.execute("SELECT proto,proxy from proxylist where failed=0 AND tested IS NOT NULL AND proto IN ('http','socks4','socks5')").fetchall() ]
|
||||||
if not _proxylist: _proxylist = None
|
if not _proxylist: _proxylist = None
|
||||||
|
|
||||||
for thread in threads:
|
for thread in threads:
|
||||||
@@ -366,6 +848,8 @@ def main():
|
|||||||
if watcherd:
|
if watcherd:
|
||||||
watcherd.stop()
|
watcherd.stop()
|
||||||
watcherd.finish()
|
watcherd.finish()
|
||||||
|
if httpd_server:
|
||||||
|
httpd_server.stop()
|
||||||
break
|
break
|
||||||
|
|
||||||
_log('ppf stopped', 'info')
|
_log('ppf stopped', 'info')
|
||||||
|
|||||||
Reference in New Issue
Block a user