watchd: first halfbaked attempt to detect/combat tor issues
This commit is contained in:
@@ -228,15 +228,32 @@ class Proxywatchd():
|
|||||||
_log("collected %d unfinished jobs"%len(self.jobs), "watchd")
|
_log("collected %d unfinished jobs"%len(self.jobs), "watchd")
|
||||||
|
|
||||||
def submit_collected(self):
|
def submit_collected(self):
|
||||||
if len(self.collected) == 0: return
|
if len(self.collected) == 0: return True
|
||||||
_log("updating %d DB entries"%len(self.collected), 'watchd')
|
sc = 0
|
||||||
|
args = []
|
||||||
|
for job in self.collected:
|
||||||
|
if job.failcount == 0: sc += 1
|
||||||
|
args.append( (job.failcount, job.nextcheck, 1, 'unknown', job.proto, job.success_count, job.total_duration, job.proxy) )
|
||||||
|
|
||||||
|
success_rate = (float(sc) / len(self.collected)) * 100
|
||||||
|
ret = True
|
||||||
|
if len(self.collected) >= 100 and success_rate <= 2.0:
|
||||||
|
_log("WATCHD %.2f%% SUCCESS RATE - tor circuit blocked? won't submit fails"%success_rate, "ERROR")
|
||||||
|
if sc == 0: return False
|
||||||
|
args = []
|
||||||
|
for job in self.collected:
|
||||||
|
if job.failcount == 0:
|
||||||
|
args.append( (job.failcount, job.nextcheck, 1, 'unknown', job.proto, job.success_count, job.total_duration, job.proxy) )
|
||||||
|
ret = False
|
||||||
|
|
||||||
|
_log("updating %d DB entries (success rate: %.2f%%)"%(len(self.collected), success_rate), 'watchd')
|
||||||
self._prep_db()
|
self._prep_db()
|
||||||
query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=? WHERE proxy=?'
|
query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=? WHERE proxy=?'
|
||||||
args = [ (job.failcount, job.nextcheck, 1, 'unknown', job.proto, job.success_count, job.total_duration, job.proxy) for job in self.collected ]
|
|
||||||
self.mysqlite.executemany(query, args)
|
self.mysqlite.executemany(query, args)
|
||||||
self.mysqlite.commit()
|
self.mysqlite.commit()
|
||||||
self._close_db()
|
self._close_db()
|
||||||
self.collected = []
|
self.collected = []
|
||||||
|
return ret
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
if config.watchd_threads == 1 and _run_standalone:
|
if config.watchd_threads == 1 and _run_standalone:
|
||||||
@@ -264,29 +281,40 @@ class Proxywatchd():
|
|||||||
self.threads.append(wt)
|
self.threads.append(wt)
|
||||||
time.sleep( (random.random()/100) )
|
time.sleep( (random.random()/100) )
|
||||||
|
|
||||||
|
sleeptime = 0
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
|
|
||||||
if self.stopping.is_set():
|
if self.stopping.is_set():
|
||||||
if self.in_background: self._cleanup()
|
if self.in_background: self._cleanup()
|
||||||
break
|
break
|
||||||
|
|
||||||
sleeptime = 1
|
if sleeptime == 0:
|
||||||
|
sleeptime = 1
|
||||||
|
else:
|
||||||
|
time.sleep(1)
|
||||||
|
sleeptime -= 1
|
||||||
|
continue
|
||||||
|
|
||||||
if self.threads[random.choice(xrange(len(self.threads)))].jobcount() == 0:
|
if self.threads[random.choice(xrange(len(self.threads)))].jobcount() == 0:
|
||||||
self.collect_unfinished()
|
self.collect_unfinished()
|
||||||
if not len(self.jobs):
|
if not len(self.jobs):
|
||||||
self.collect_work()
|
self.collect_work()
|
||||||
self.submit_collected()
|
if not self.submit_collected():
|
||||||
self.prepare_jobs()
|
_log("zzZzZzzZ sleeping 1 minute(s) due to tor issues - consider decreasing thread number!", "watchd")
|
||||||
|
self.collect_unfinished()
|
||||||
|
sleeptime = 1*60
|
||||||
|
else:
|
||||||
|
self.prepare_jobs()
|
||||||
else:
|
else:
|
||||||
if len(self.jobs) < len(self.threads):
|
if len(self.jobs) < len(self.threads):
|
||||||
# allow threads enough time to consume the jobs
|
# allow threads enough time to consume the jobs
|
||||||
sleeptime = 10
|
sleeptime = 10
|
||||||
if len(self.jobs):
|
if len(self.jobs):
|
||||||
_log("handing out %d jobs to %d thread(s)"% (len(self.jobs), config.watchd_threads), 'watchd')
|
_log("handing out %d jobs to %d thread(s)"% (len(self.jobs), len(self.threads)), 'watchd')
|
||||||
jpt = len(self.jobs)/config.watchd_threads
|
jpt = len(self.jobs)/len(self.threads)
|
||||||
if len(self.jobs)/float(config.watchd_threads) - jpt > 0.0: jpt += 1
|
if len(self.jobs)/float(len(self.threads)) - jpt > 0.0: jpt += 1
|
||||||
for tid in range(config.watchd_threads):
|
for tid in xrange(len(self.threads)):
|
||||||
self.threads[tid].add_jobs(self.jobs[tid*jpt:tid*jpt+jpt])
|
self.threads[tid].add_jobs(self.jobs[tid*jpt:tid*jpt+jpt])
|
||||||
self.jobs = []
|
self.jobs = []
|
||||||
|
|
||||||
@@ -296,9 +324,13 @@ class Proxywatchd():
|
|||||||
self.collect_work()
|
self.collect_work()
|
||||||
|
|
||||||
if len(self.collected) > self.submit_after:
|
if len(self.collected) > self.submit_after:
|
||||||
self.submit_collected()
|
if not self.submit_collected():
|
||||||
|
_log("zzZzZzzZ sleeping 1 minute(s) due to tor issues - consider decreasing thread number!", "watchd")
|
||||||
|
self.collect_unfinished()
|
||||||
|
sleeptime = 1*60
|
||||||
|
|
||||||
time.sleep(sleeptime)
|
time.sleep(1)
|
||||||
|
sleeptime -= 1
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
|||||||
Reference in New Issue
Block a user