proxywatchd: implement multi-target validation with work-stealing queue
This commit is contained in:
496
proxywatchd.py
496
proxywatchd.py
@@ -2,6 +2,7 @@
|
||||
|
||||
import threading
|
||||
import time, random, string, re, copy
|
||||
import Queue
|
||||
try:
|
||||
import IP2Location
|
||||
import os
|
||||
@@ -88,8 +89,14 @@ def socks4_resolve(srvname, server_port):
|
||||
return srv
|
||||
|
||||
|
||||
class WorkerJob():
|
||||
def __init__(self, ip, port, proto, failcount, success_count, total_duration, country, mitm, consecutive_success, oldies = False):
|
||||
class ProxyTestState():
|
||||
"""Thread-safe state for a proxy being tested against multiple targets.
|
||||
|
||||
Results from TargetTestJob instances are aggregated here.
|
||||
When all tests complete, evaluate() determines final pass/fail.
|
||||
"""
|
||||
def __init__(self, ip, port, proto, failcount, success_count, total_duration,
|
||||
country, mitm, consecutive_success, num_targets=3, oldies=False):
|
||||
self.ip = ip
|
||||
self.port = int(port)
|
||||
self.proxy = '%s:%s' % (ip, port)
|
||||
@@ -102,85 +109,29 @@ class WorkerJob():
|
||||
self.mitm = mitm
|
||||
self.consecutive_success = consecutive_success
|
||||
self.isoldies = oldies
|
||||
self.num_targets = num_targets
|
||||
|
||||
def connect_socket(self, checktype):
|
||||
# thread-safe result accumulation
|
||||
self.lock = threading.Lock()
|
||||
self.results = [] # list of (success, proto, duration, srv, tor, ssl)
|
||||
self.completed = False
|
||||
|
||||
if checktype == 'irc':
|
||||
srvname = random.choice(config.servers).strip()
|
||||
use_ssl = random.choice([0,1]) if config.watchd.use_ssl == 2 else config.watchd.use_ssl
|
||||
if self.consecutive_success > 0 and (self.consecutive_success % 3) == 0: use_ssl = 1
|
||||
server_port = 6697 if use_ssl else 6667
|
||||
elif checktype == 'http':
|
||||
srvname = random.choice( [ i for i in regexes.keys() ])
|
||||
use_ssl = random.choice([0,1]) if config.watchd.use_ssl == 2 else config.watchd.use_ssl
|
||||
if self.consecutive_success > 0 and (self.consecutive_success % 3) == 0: use_ssl = 1
|
||||
server_port = 443 if use_ssl else 80
|
||||
def record_result(self, success, proto=None, duration=0, srv=None, tor=None, ssl=None):
|
||||
"""Record a single target test result. Thread-safe."""
|
||||
with self.lock:
|
||||
self.results.append({
|
||||
'success': success,
|
||||
'proto': proto,
|
||||
'duration': duration,
|
||||
'srv': srv,
|
||||
'tor': tor,
|
||||
'ssl': ssl
|
||||
})
|
||||
|
||||
verifycert = True if use_ssl else False
|
||||
protos = ['http', 'socks5', 'socks4'] if self.proto is None else [self.proto]
|
||||
|
||||
fail_inc = 1
|
||||
|
||||
for proto in protos:
|
||||
torhost = random.choice(config.torhosts)
|
||||
# socks4 (without 4a) requires a raw ip address
|
||||
# rocksock automatically resolves if needed, but it's more
|
||||
# efficient to cache the result.
|
||||
if proto == 'socks4': srv = socks4_resolve(srvname, server_port)
|
||||
else: srv = srvname
|
||||
## skip socks4 failed resolution
|
||||
if not srv: continue
|
||||
|
||||
duration = time.time()
|
||||
proxies = [
|
||||
rocksock.RocksockProxyFromURL('socks5://%s' % torhost),
|
||||
rocksock.RocksockProxyFromURL('%s://%s:%s' % (proto, self.ip, self.port)),
|
||||
]
|
||||
|
||||
try:
|
||||
sock = rocksock.Rocksock(host=srv, port=server_port, ssl=use_ssl, proxies=proxies, timeout=config.watchd.timeout, verifycert=verifycert)
|
||||
sock.connect()
|
||||
if checktype == 'irc':
|
||||
sock.send('NICK\n')
|
||||
elif checktype == 'http':
|
||||
sock.send('HEAD / HTTP/1.0\r\nHost: %s\r\n\r\n' % srvname)
|
||||
return sock, proto, duration, torhost, srvname, 0, use_ssl
|
||||
except rocksock.RocksockException as e:
|
||||
if config.watchd.debug:
|
||||
_log("proxy failed: %s://%s:%d: %s"%(proto, self.ip, self.port, e.get_errormessage()), 'debug')
|
||||
|
||||
et = e.get_errortype()
|
||||
err = e.get_error()
|
||||
fp = e.get_failedproxy()
|
||||
|
||||
sock.disconnect()
|
||||
|
||||
if et == rocksock.RS_ET_OWN:
|
||||
if fp == 1 and \
|
||||
err == rocksock.RS_E_REMOTE_DISCONNECTED or \
|
||||
err == rocksock.RS_E_HIT_TIMEOUT:
|
||||
# proxy is not online, so don't waste time trying all possible protocols
|
||||
break
|
||||
elif fp == 0 and \
|
||||
err == rocksock.RS_E_TARGET_CONN_REFUSED:
|
||||
fail_inc = 0
|
||||
if random.randint(0, (config.watchd.threads-1)/2) == 0:
|
||||
_log("could not connect to proxy 0, sleep 5s", "ERROR")
|
||||
time.sleep(5)
|
||||
elif et == rocksock.RS_ET_GAI:
|
||||
assert(0)
|
||||
fail_inc = 0
|
||||
_log("could not resolve connection target %s"%srvname, "ERROR")
|
||||
break
|
||||
|
||||
elif err == rocksock.RS_E_SSL_CERTIFICATE_ERROR:
|
||||
fail_inc = 0
|
||||
self.mitm = 1
|
||||
|
||||
except KeyboardInterrupt as e:
|
||||
raise(e)
|
||||
|
||||
return None, None, None, None, None, fail_inc, use_ssl
|
||||
def is_complete(self):
|
||||
"""Check if all target tests have finished."""
|
||||
with self.lock:
|
||||
return len(self.results) >= self.num_targets
|
||||
|
||||
def rwip(self, ip):
|
||||
n = []
|
||||
@@ -189,119 +140,207 @@ class WorkerJob():
|
||||
n.append(b)
|
||||
return '.'.join(n)
|
||||
|
||||
def evaluate(self):
|
||||
"""Evaluate results after all tests complete. Returns True if proxy is valid."""
|
||||
with self.lock:
|
||||
if self.completed:
|
||||
return self.failcount == 0
|
||||
self.completed = True
|
||||
self.checktime = int(time.time())
|
||||
|
||||
def run(self):
|
||||
self.checktime = int(time.time())
|
||||
checktype = config.watchd.checktype
|
||||
successes = [r for r in self.results if r['success']]
|
||||
num_success = len(successes)
|
||||
|
||||
sock, proto, duration, tor, srv, failinc, is_ssl = self.connect_socket(checktype)
|
||||
if not sock:
|
||||
self.failcount += failinc
|
||||
return
|
||||
try:
|
||||
recv = sock.recv(-1)
|
||||
|
||||
regex = '^(:|NOTICE|ERROR)' if checktype == 'irc' else regexes[srv]
|
||||
# good data
|
||||
if re.search(regex, recv, re.IGNORECASE):
|
||||
duration = (time.time() - duration)
|
||||
# require majority success (2/3)
|
||||
if num_success >= 2:
|
||||
# use last successful result for metrics
|
||||
last_good = successes[-1]
|
||||
|
||||
if geolite and self.country is None:
|
||||
self.ip = self.rwip(self.ip)
|
||||
rec = geodb.get_all(self.ip)
|
||||
if rec is not None and rec.country_short: self.country = rec.country_short
|
||||
if rec is not None and rec.country_short:
|
||||
self.country = rec.country_short
|
||||
|
||||
self.proto = proto
|
||||
self.proto = last_good['proto']
|
||||
self.failcount = 0
|
||||
if (self.consecutive_success %3) == 0: self.mitm = 0
|
||||
self.consecutive_success = self.consecutive_success + 1
|
||||
self.success_count = self.success_count + 1
|
||||
self.total_duration += int(duration*1000)
|
||||
torstats = "" if len(config.torhosts)==1 else ' tor: %s;'%tor
|
||||
recvstats = "".join([x if x in string.printable and ord(x) > 32 else '.' for x in recv])
|
||||
_log('%s://%s:%d (%s) d: %.2f sec(s);%s srv: %s; ssl: %s; recv: %s' % (proto, self.ip, self.port, self.country, duration, torstats, srv, str(is_ssl), recvstats[:50]), 'xxxxx')
|
||||
if (self.consecutive_success % 3) == 0:
|
||||
self.mitm = 0
|
||||
self.consecutive_success += 1
|
||||
self.success_count += 1
|
||||
self.total_duration += int(last_good['duration'] * 1000)
|
||||
|
||||
torstats = "" if len(config.torhosts) == 1 else ' tor: %s;' % last_good['tor']
|
||||
_log('%s://%s:%d (%s) d: %.2f sec(s);%s srv: %s; ssl: %s; %d/%d targets' % (
|
||||
last_good['proto'], self.ip, self.port, self.country,
|
||||
last_good['duration'], torstats, last_good['srv'], str(last_good['ssl']),
|
||||
num_success, self.num_targets), 'xxxxx')
|
||||
return True
|
||||
|
||||
elif num_success == 1:
|
||||
# partial success - don't increment fail, but reset consecutive
|
||||
self.consecutive_success = 0
|
||||
_log('%s:%d partial success %d/%d targets' % (
|
||||
self.ip, self.port, num_success, self.num_targets), 'debug')
|
||||
return False
|
||||
|
||||
else:
|
||||
self.failcount += 1
|
||||
self.consecutive_success = 0
|
||||
return False
|
||||
|
||||
|
||||
class TargetTestJob():
|
||||
"""Job to test a single proxy against a single target.
|
||||
|
||||
Multiple TargetTestJob instances share the same ProxyTestState,
|
||||
allowing tests to be interleaved with other proxies in the queue.
|
||||
"""
|
||||
def __init__(self, proxy_state, target_srv, checktype):
|
||||
self.proxy_state = proxy_state
|
||||
self.target_srv = target_srv
|
||||
self.checktype = checktype
|
||||
|
||||
def run(self):
|
||||
"""Test the proxy against this job's target server."""
|
||||
sock, proto, duration, tor, srv, failinc, is_ssl = self._connect_and_test()
|
||||
|
||||
if not sock:
|
||||
self.proxy_state.record_result(False)
|
||||
return
|
||||
|
||||
try:
|
||||
recv = sock.recv(-1)
|
||||
regex = '^(:|NOTICE|ERROR)' if self.checktype == 'irc' else regexes[srv]
|
||||
|
||||
if re.search(regex, recv, re.IGNORECASE):
|
||||
elapsed = time.time() - duration
|
||||
self.proxy_state.record_result(
|
||||
True, proto=proto, duration=elapsed,
|
||||
srv=srv, tor=tor, ssl=is_ssl
|
||||
)
|
||||
else:
|
||||
self.proxy_state.record_result(False)
|
||||
|
||||
except KeyboardInterrupt as e:
|
||||
sock.disconnect()
|
||||
raise e
|
||||
except rocksock.RocksockException as e:
|
||||
self.failcount += 1
|
||||
self.consecutive_success = 0
|
||||
except rocksock.RocksockException:
|
||||
self.proxy_state.record_result(False)
|
||||
finally:
|
||||
sock.disconnect()
|
||||
|
||||
def _connect_and_test(self):
|
||||
"""Connect to target through the proxy and send test packet."""
|
||||
ps = self.proxy_state
|
||||
srvname = self.target_srv.strip() if self.checktype == 'irc' else self.target_srv
|
||||
|
||||
use_ssl = random.choice([0, 1]) if config.watchd.use_ssl == 2 else config.watchd.use_ssl
|
||||
if ps.consecutive_success > 0 and (ps.consecutive_success % 3) == 0:
|
||||
use_ssl = 1
|
||||
|
||||
if self.checktype == 'irc':
|
||||
server_port = 6697 if use_ssl else 6667
|
||||
else:
|
||||
server_port = 443 if use_ssl else 80
|
||||
|
||||
verifycert = True if use_ssl else False
|
||||
protos = ['http', 'socks5', 'socks4'] if ps.proto is None else [ps.proto]
|
||||
|
||||
for proto in protos:
|
||||
torhost = random.choice(config.torhosts)
|
||||
if proto == 'socks4':
|
||||
srv = socks4_resolve(srvname, server_port)
|
||||
else:
|
||||
srv = srvname
|
||||
if not srv:
|
||||
continue
|
||||
|
||||
duration = time.time()
|
||||
proxies = [
|
||||
rocksock.RocksockProxyFromURL('socks5://%s' % torhost),
|
||||
rocksock.RocksockProxyFromURL('%s://%s:%s' % (proto, ps.ip, ps.port)),
|
||||
]
|
||||
|
||||
try:
|
||||
sock = rocksock.Rocksock(host=srv, port=server_port, ssl=use_ssl,
|
||||
proxies=proxies, timeout=config.watchd.timeout,
|
||||
verifycert=verifycert)
|
||||
sock.connect()
|
||||
if self.checktype == 'irc':
|
||||
sock.send('NICK\n')
|
||||
else:
|
||||
sock.send('HEAD / HTTP/1.0\r\nHost: %s\r\n\r\n' % srvname)
|
||||
return sock, proto, duration, torhost, srvname, 0, use_ssl
|
||||
|
||||
except rocksock.RocksockException as e:
|
||||
if config.watchd.debug:
|
||||
_log("proxy failed: %s://%s:%d: %s" % (proto, ps.ip, ps.port,
|
||||
e.get_errormessage()), 'debug')
|
||||
|
||||
et = e.get_errortype()
|
||||
err = e.get_error()
|
||||
fp = e.get_failedproxy()
|
||||
|
||||
sock.disconnect()
|
||||
|
||||
if et == rocksock.RS_ET_OWN:
|
||||
if fp == 1 and (err == rocksock.RS_E_REMOTE_DISCONNECTED or
|
||||
err == rocksock.RS_E_HIT_TIMEOUT):
|
||||
break
|
||||
elif fp == 0 and err == rocksock.RS_E_TARGET_CONN_REFUSED:
|
||||
if random.randint(0, (config.watchd.threads - 1) / 2) == 0:
|
||||
_log("could not connect to tor, sleep 5s", "ERROR")
|
||||
time.sleep(5)
|
||||
elif et == rocksock.RS_ET_GAI:
|
||||
_log("could not resolve connection target %s" % srvname, "ERROR")
|
||||
break
|
||||
elif err == rocksock.RS_E_SSL_CERTIFICATE_ERROR:
|
||||
ps.mitm = 1
|
||||
|
||||
except KeyboardInterrupt as e:
|
||||
raise e
|
||||
|
||||
return None, None, None, None, None, 1, use_ssl
|
||||
|
||||
|
||||
class WorkerThread():
|
||||
def __init__ (self, id):
|
||||
def __init__(self, id, job_queue, result_queue):
|
||||
self.id = id
|
||||
self.done = threading.Event()
|
||||
self.thread = None
|
||||
self.workqueue = []
|
||||
self.workdone = []
|
||||
self.lock = threading.Lock()
|
||||
self.job_queue = job_queue # shared input queue
|
||||
self.result_queue = result_queue # shared output queue
|
||||
def stop(self):
|
||||
self.done.set()
|
||||
def term(self):
|
||||
if self.thread: self.thread.join()
|
||||
def add_jobs(self, jobs):
|
||||
with self.lock:
|
||||
self.workqueue.extend(jobs)
|
||||
def return_jobs(self):
|
||||
with self.lock:
|
||||
jobs = self.workqueue
|
||||
self.workqueue = []
|
||||
return jobs
|
||||
def jobcount(self):
|
||||
return len(self.workqueue)
|
||||
def collect(self):
|
||||
wd = copy.copy(self.workdone)
|
||||
self.workdone = []
|
||||
return wd
|
||||
def start_thread(self):
|
||||
self.thread = threading.Thread(target=self.workloop)
|
||||
self.thread.start()
|
||||
def pop_if_possible(self):
|
||||
with self.lock:
|
||||
if len(self.workqueue):
|
||||
job = self.workqueue.pop()
|
||||
else:
|
||||
job = None
|
||||
return job
|
||||
def workloop(self):
|
||||
success_count = 0
|
||||
job_count = 0
|
||||
duration_total = 0
|
||||
duration_success_total = 0
|
||||
while True:
|
||||
job = self.pop_if_possible()
|
||||
if job:
|
||||
nao = time.time()
|
||||
job.run()
|
||||
spent = time.time() - nao
|
||||
if job.failcount == 0:
|
||||
duration_success_total += spent
|
||||
success_count += 1
|
||||
job_count += 1
|
||||
duration_total += spent
|
||||
self.workdone.append(job)
|
||||
elif not self.thread:
|
||||
break
|
||||
if self.done.is_set(): break
|
||||
time.sleep( random.random() / 100)
|
||||
if self.thread:
|
||||
succ_rate = try_div(success_count, job_count)*100
|
||||
avg_succ_t = try_div(duration_success_total, success_count)
|
||||
avg_fail_t = try_div(duration_total-duration_success_total, job_count-success_count)
|
||||
while not self.done.is_set():
|
||||
try:
|
||||
job = self.job_queue.get(timeout=0.5)
|
||||
except Queue.Empty:
|
||||
continue
|
||||
nao = time.time()
|
||||
job.run()
|
||||
spent = time.time() - nao
|
||||
job_count += 1
|
||||
duration_total += spent
|
||||
self.result_queue.put(job)
|
||||
self.job_queue.task_done()
|
||||
if self.thread and job_count > 0:
|
||||
avg_t = try_div(duration_total, job_count)
|
||||
_log("terminated, %d/%d (%.2f%%), avg.time S/F/T %.2f, %.2f, %.2f" \
|
||||
% (success_count, job_count, succ_rate, avg_succ_t, avg_fail_t, avg_t) \
|
||||
, self.id)
|
||||
_log("terminated, %d jobs, avg.time %.2f" % (job_count, avg_t), self.id)
|
||||
|
||||
class Proxywatchd():
|
||||
|
||||
def stop(self):
|
||||
_log('halting... (%d thread(s))' % len([item for item in self.threads if True]), 'watchd')
|
||||
_log('halting... (%d thread(s))' % len(self.threads), 'watchd')
|
||||
self.stopping.set()
|
||||
|
||||
def _cleanup(self):
|
||||
@@ -332,6 +371,14 @@ class Proxywatchd():
|
||||
self.stopping = threading.Event()
|
||||
self.stopped = threading.Event()
|
||||
|
||||
# shared work-stealing queues
|
||||
self.job_queue = Queue.Queue()
|
||||
self.result_queue = Queue.Queue()
|
||||
|
||||
# track pending proxy states (for multi-target aggregation)
|
||||
self.pending_states = [] # list of ProxyTestState awaiting completion
|
||||
self.pending_lock = threading.Lock()
|
||||
|
||||
# create table if needed
|
||||
self._prep_db()
|
||||
self.mysqlite.execute('CREATE TABLE IF NOT EXISTS proxylist (proxy BLOB, country BLOB, added INT, failed INT, tested INT, source BLOB, dronebl INT, proto TEXT, success_count INT, total_duration INT, ip TEXT, port INT)')
|
||||
@@ -339,8 +386,7 @@ class Proxywatchd():
|
||||
self._close_db()
|
||||
|
||||
self.submit_after = config.watchd.submit_after # number of collected jobs before writing db
|
||||
self.jobs = []
|
||||
self.collected = []
|
||||
self.collected = [] # completed ProxyTestState objects ready for DB
|
||||
self.totals = {
|
||||
'submitted':0,
|
||||
'success':0,
|
||||
@@ -365,22 +411,88 @@ class Proxywatchd():
|
||||
## enable tor safeguard by default
|
||||
self.tor_safeguard = config.watchd.tor_safeguard
|
||||
rows = self.fetch_rows()
|
||||
#print('preparing jobbs, oldies: %s' % str(self.isoldies))
|
||||
checktype = config.watchd.checktype
|
||||
num_targets = 3
|
||||
|
||||
# select target pool based on checktype
|
||||
if checktype == 'irc':
|
||||
target_pool = config.servers
|
||||
else:
|
||||
target_pool = list(regexes.keys())
|
||||
|
||||
# create all jobs first, then shuffle for interleaving
|
||||
all_jobs = []
|
||||
new_states = []
|
||||
|
||||
for row in rows:
|
||||
job = WorkerJob(row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], self.isoldies)
|
||||
self.jobs.append(job)
|
||||
# create shared state for this proxy
|
||||
state = ProxyTestState(
|
||||
row[0], row[1], row[2], row[3], row[4], row[5],
|
||||
row[6], row[7], row[8], num_targets=num_targets,
|
||||
oldies=self.isoldies
|
||||
)
|
||||
new_states.append(state)
|
||||
|
||||
# select random targets for this proxy
|
||||
targets = random.sample(target_pool, min(num_targets, len(target_pool)))
|
||||
|
||||
# create one job per target
|
||||
for target in targets:
|
||||
job = TargetTestJob(state, target, checktype)
|
||||
all_jobs.append(job)
|
||||
|
||||
# shuffle to interleave tests across different proxies
|
||||
random.shuffle(all_jobs)
|
||||
|
||||
# track pending states
|
||||
with self.pending_lock:
|
||||
self.pending_states.extend(new_states)
|
||||
|
||||
# queue all jobs
|
||||
for job in all_jobs:
|
||||
self.job_queue.put(job)
|
||||
|
||||
self._close_db()
|
||||
proxy_count = len(new_states)
|
||||
job_count = len(all_jobs)
|
||||
if proxy_count > 0:
|
||||
_log("created %d jobs for %d proxies (%d targets each)" % (
|
||||
job_count, proxy_count, num_targets), 'watchd')
|
||||
return job_count
|
||||
|
||||
def collect_work(self):
|
||||
for wt in self.threads:
|
||||
self.collected.extend(wt.collect())
|
||||
# drain results from shared result queue (TargetTestJob objects)
|
||||
# results are already recorded in their ProxyTestState
|
||||
while True:
|
||||
try:
|
||||
self.result_queue.get_nowait()
|
||||
except Queue.Empty:
|
||||
break
|
||||
|
||||
# check for completed proxy states and evaluate them
|
||||
with self.pending_lock:
|
||||
still_pending = []
|
||||
for state in self.pending_states:
|
||||
if state.is_complete():
|
||||
state.evaluate()
|
||||
self.collected.append(state)
|
||||
else:
|
||||
still_pending.append(state)
|
||||
self.pending_states = still_pending
|
||||
|
||||
def collect_unfinished(self):
|
||||
for wt in self.threads:
|
||||
jobs = wt.return_jobs()
|
||||
self.jobs.extend(jobs)
|
||||
if len(self.jobs):
|
||||
_log("collected %d unfinished jobs"%len(self.jobs), "watchd")
|
||||
# drain any remaining jobs from job queue
|
||||
unfinished_count = 0
|
||||
while True:
|
||||
try:
|
||||
self.job_queue.get_nowait()
|
||||
unfinished_count += 1
|
||||
except Queue.Empty:
|
||||
break
|
||||
if unfinished_count > 0:
|
||||
_log("discarded %d unfinished jobs" % unfinished_count, "watchd")
|
||||
# note: corresponding ProxyTestStates will be incomplete
|
||||
# they'll be re-tested in the next cycle
|
||||
|
||||
def submit_collected(self):
|
||||
if len(self.collected) == 0: return True
|
||||
@@ -430,66 +542,50 @@ class Proxywatchd():
|
||||
def _run(self):
|
||||
_log('starting...', 'watchd')
|
||||
|
||||
# create worker threads with shared queues
|
||||
for i in range(config.watchd.threads):
|
||||
threadid = ''.join( [ random.choice(string.letters) for x in range(5) ] )
|
||||
wt = WorkerThread(threadid)
|
||||
threadid = ''.join([random.choice(string.letters) for x in range(5)])
|
||||
wt = WorkerThread(threadid, self.job_queue, self.result_queue)
|
||||
if self.in_background:
|
||||
wt.start_thread()
|
||||
self.threads.append(wt)
|
||||
#time.sleep( (random.random()/100) )
|
||||
time.sleep( (random.random()/10) )
|
||||
time.sleep(random.random() / 10)
|
||||
|
||||
sleeptime = 0
|
||||
while True:
|
||||
|
||||
if self.stopping.is_set():
|
||||
print('stopping is_set')
|
||||
if self.in_background: self._cleanup()
|
||||
break
|
||||
|
||||
if sleeptime == 0:
|
||||
sleeptime = 1
|
||||
else:
|
||||
if sleeptime > 0:
|
||||
time.sleep(1)
|
||||
sleeptime -= 1
|
||||
continue
|
||||
|
||||
if self.threads[random.choice(xrange(len(self.threads)))].jobcount() == 0:
|
||||
self.collect_unfinished()
|
||||
if not len(self.jobs):
|
||||
self.collect_work()
|
||||
if not self.submit_collected() and self.tor_safeguard:
|
||||
_log("zzZzZzzZ sleeping 1 minute(s) due to tor issues - consider decreasing thread number!", "watchd")
|
||||
self.collect_unfinished()
|
||||
sleeptime = 1*60
|
||||
else:
|
||||
self.prepare_jobs()
|
||||
# check if job queue is empty (work-stealing: threads pull as needed)
|
||||
if self.job_queue.empty():
|
||||
self.collect_work()
|
||||
if not self.submit_collected() and self.tor_safeguard:
|
||||
_log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd")
|
||||
sleeptime = 60
|
||||
else:
|
||||
if len(self.jobs) < len(self.threads):
|
||||
# allow threads enough time to consume the jobs
|
||||
job_count = self.prepare_jobs()
|
||||
if job_count == 0:
|
||||
# no jobs available, wait before checking again
|
||||
sleeptime = 10
|
||||
#if len(self.jobs) >= len(self.threads):
|
||||
if len(self.jobs):
|
||||
_log("handing out %d jobs to %d thread(s)"% (len(self.jobs), len(self.threads)), 'watchd')
|
||||
jpt = len(self.jobs)/len(self.threads)
|
||||
if len(self.jobs)/float(len(self.threads)) - jpt > 0.0: jpt += 1
|
||||
for tid in xrange(len(self.threads)):
|
||||
self.threads[tid].add_jobs(self.jobs[tid*jpt:tid*jpt+jpt])
|
||||
self.jobs = []
|
||||
|
||||
if not self.in_background: # single_thread scenario
|
||||
if not self.in_background: # single_thread scenario
|
||||
self.threads[0].workloop()
|
||||
|
||||
self.collect_work()
|
||||
|
||||
if len(self.collected) > self.submit_after:
|
||||
if not self.submit_collected() and self.tor_safeguard:
|
||||
_log("zzZzZzzZ sleeping 1 minute(s) due to tor issues - consider decreasing thread number!", "watchd")
|
||||
self.collect_unfinished()
|
||||
sleeptime = 1*60
|
||||
_log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd")
|
||||
sleeptime = 60
|
||||
|
||||
time.sleep(1)
|
||||
sleeptime -= 1
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
Reference in New Issue
Block a user