Merge branch 'changes8' into 'master'

watchd: first halfbaked attempt to detect/combat tor issues

See merge request mserneels/ppf!9
This commit is contained in:
mserneels
2019-01-07 21:38:12 +00:00

View File

@@ -228,15 +228,32 @@ class Proxywatchd():
_log("collected %d unfinished jobs"%len(self.jobs), "watchd")
def submit_collected(self):
if len(self.collected) == 0: return
_log("updating %d DB entries"%len(self.collected), 'watchd')
if len(self.collected) == 0: return True
sc = 0
args = []
for job in self.collected:
if job.failcount == 0: sc += 1
args.append( (job.failcount, job.nextcheck, 1, 'unknown', job.proto, job.success_count, job.total_duration, job.proxy) )
success_rate = (float(sc) / len(self.collected)) * 100
ret = True
if len(self.collected) >= 100 and success_rate <= 2.0:
_log("WATCHD %.2f%% SUCCESS RATE - tor circuit blocked? won't submit fails"%success_rate, "ERROR")
if sc == 0: return False
args = []
for job in self.collected:
if job.failcount == 0:
args.append( (job.failcount, job.nextcheck, 1, 'unknown', job.proto, job.success_count, job.total_duration, job.proxy) )
ret = False
_log("updating %d DB entries (success rate: %.2f%%)"%(len(self.collected), success_rate), 'watchd')
self._prep_db()
query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=? WHERE proxy=?'
args = [ (job.failcount, job.nextcheck, 1, 'unknown', job.proto, job.success_count, job.total_duration, job.proxy) for job in self.collected ]
self.mysqlite.executemany(query, args)
self.mysqlite.commit()
self._close_db()
self.collected = []
return ret
def start(self):
if config.watchd_threads == 1 and _run_standalone:
@@ -264,29 +281,40 @@ class Proxywatchd():
self.threads.append(wt)
time.sleep( (random.random()/100) )
sleeptime = 0
while True:
if self.stopping.is_set():
if self.in_background: self._cleanup()
break
sleeptime = 1
if sleeptime == 0:
sleeptime = 1
else:
time.sleep(1)
sleeptime -= 1
continue
if self.threads[random.choice(xrange(len(self.threads)))].jobcount() == 0:
self.collect_unfinished()
if not len(self.jobs):
self.collect_work()
self.submit_collected()
self.prepare_jobs()
if not self.submit_collected():
_log("zzZzZzzZ sleeping 1 minute(s) due to tor issues - consider decreasing thread number!", "watchd")
self.collect_unfinished()
sleeptime = 1*60
else:
self.prepare_jobs()
else:
if len(self.jobs) < len(self.threads):
# allow threads enough time to consume the jobs
sleeptime = 10
if len(self.jobs):
_log("handing out %d jobs to %d thread(s)"% (len(self.jobs), config.watchd_threads), 'watchd')
jpt = len(self.jobs)/config.watchd_threads
if len(self.jobs)/float(config.watchd_threads) - jpt > 0.0: jpt += 1
for tid in range(config.watchd_threads):
_log("handing out %d jobs to %d thread(s)"% (len(self.jobs), len(self.threads)), 'watchd')
jpt = len(self.jobs)/len(self.threads)
if len(self.jobs)/float(len(self.threads)) - jpt > 0.0: jpt += 1
for tid in xrange(len(self.threads)):
self.threads[tid].add_jobs(self.jobs[tid*jpt:tid*jpt+jpt])
self.jobs = []
@@ -296,9 +324,13 @@ class Proxywatchd():
self.collect_work()
if len(self.collected) > self.submit_after:
self.submit_collected()
if not self.submit_collected():
_log("zzZzZzzZ sleeping 1 minute(s) due to tor issues - consider decreasing thread number!", "watchd")
self.collect_unfinished()
sleeptime = 1*60
time.sleep(sleeptime)
time.sleep(1)
sleeptime -= 1
if __name__ == '__main__':