diff --git a/proxywatchd.py b/proxywatchd.py index 615d614..acd993e 100644 --- a/proxywatchd.py +++ b/proxywatchd.py @@ -228,15 +228,32 @@ class Proxywatchd(): _log("collected %d unfinished jobs"%len(self.jobs), "watchd") def submit_collected(self): - if len(self.collected) == 0: return - _log("updating %d DB entries"%len(self.collected), 'watchd') + if len(self.collected) == 0: return True + sc = 0 + args = [] + for job in self.collected: + if job.failcount == 0: sc += 1 + args.append( (job.failcount, job.nextcheck, 1, 'unknown', job.proto, job.success_count, job.total_duration, job.proxy) ) + + success_rate = (float(sc) / len(self.collected)) * 100 + ret = True + if len(self.collected) >= 100 and success_rate <= 2.0: + _log("WATCHD %.2f%% SUCCESS RATE - tor circuit blocked? won't submit fails"%success_rate, "ERROR") + if sc == 0: return False + args = [] + for job in self.collected: + if job.failcount == 0: + args.append( (job.failcount, job.nextcheck, 1, 'unknown', job.proto, job.success_count, job.total_duration, job.proxy) ) + ret = False + + _log("updating %d DB entries (success rate: %.2f%%)"%(len(self.collected), success_rate), 'watchd') self._prep_db() query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=? WHERE proxy=?' - args = [ (job.failcount, job.nextcheck, 1, 'unknown', job.proto, job.success_count, job.total_duration, job.proxy) for job in self.collected ] self.mysqlite.executemany(query, args) self.mysqlite.commit() self._close_db() self.collected = [] + return ret def start(self): if config.watchd_threads == 1 and _run_standalone: @@ -264,29 +281,40 @@ class Proxywatchd(): self.threads.append(wt) time.sleep( (random.random()/100) ) + sleeptime = 0 + while True: if self.stopping.is_set(): if self.in_background: self._cleanup() break - sleeptime = 1 + if sleeptime == 0: + sleeptime = 1 + else: + time.sleep(1) + sleeptime -= 1 + continue if self.threads[random.choice(xrange(len(self.threads)))].jobcount() == 0: self.collect_unfinished() if not len(self.jobs): self.collect_work() - self.submit_collected() - self.prepare_jobs() + if not self.submit_collected(): + _log("zzZzZzzZ sleeping 1 minute(s) due to tor issues - consider decreasing thread number!", "watchd") + self.collect_unfinished() + sleeptime = 1*60 + else: + self.prepare_jobs() else: if len(self.jobs) < len(self.threads): # allow threads enough time to consume the jobs sleeptime = 10 if len(self.jobs): - _log("handing out %d jobs to %d thread(s)"% (len(self.jobs), config.watchd_threads), 'watchd') - jpt = len(self.jobs)/config.watchd_threads - if len(self.jobs)/float(config.watchd_threads) - jpt > 0.0: jpt += 1 - for tid in range(config.watchd_threads): + _log("handing out %d jobs to %d thread(s)"% (len(self.jobs), len(self.threads)), 'watchd') + jpt = len(self.jobs)/len(self.threads) + if len(self.jobs)/float(len(self.threads)) - jpt > 0.0: jpt += 1 + for tid in xrange(len(self.threads)): self.threads[tid].add_jobs(self.jobs[tid*jpt:tid*jpt+jpt]) self.jobs = [] @@ -296,9 +324,13 @@ class Proxywatchd(): self.collect_work() if len(self.collected) > self.submit_after: - self.submit_collected() + if not self.submit_collected(): + _log("zzZzZzzZ sleeping 1 minute(s) due to tor issues - consider decreasing thread number!", "watchd") + self.collect_unfinished() + sleeptime = 1*60 - time.sleep(sleeptime) + time.sleep(1) + sleeptime -= 1 if __name__ == '__main__':