watchd: distribute remaining jobs among threads if one is idle

This commit is contained in:
rofl0r
2019-01-07 05:39:43 +00:00
parent 898c8f36ee
commit d93f14ad67

View File

@@ -113,6 +113,10 @@ class WorkerThread():
if self.thread: self.thread.join()
def add_jobs(self, jobs):
self.workqueue.extend(jobs)
def return_jobs(self):
jobs = self.workqueue
self.workqueue = []
return jobs
def jobcount(self):
return len(self.workqueue)
def collect(self):
@@ -210,7 +214,15 @@ class Proxywatchd():
for wt in self.threads:
self.collected.extend(wt.collect())
def collect_unfinished(self):
for wt in self.threads:
jobs = wt.return_jobs()
self.jobs.extend(jobs)
if len(self.jobs):
_log("collected %d unfinished jobs"%len(self.jobs), "watchd")
def submit_collected(self):
if len(self.collected) == 0: return
_log("watchd main thread: updating %d entries"%len(self.collected))
self._prep_db()
query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=? WHERE proxy=?'
@@ -252,8 +264,18 @@ class Proxywatchd():
if self.in_background: self._cleanup()
break
sleeptime = 1
if self.threads[random.choice(xrange(len(self.threads)))].jobcount() == 0:
self.prepare_jobs()
self.collect_unfinished()
if not len(self.jobs):
self.collect_work()
self.submit_collected()
self.prepare_jobs()
else:
if len(self.jobs) < len(self.threads):
# allow threads enough time to consume the jobs
sleeptime = 10
if len(self.jobs):
_log("watchd main: handing out %d jobs and %d thread(s)"% (len(self.jobs), config.watchd_threads))
jpt = len(self.jobs)/config.watchd_threads
@@ -270,7 +292,7 @@ class Proxywatchd():
if len(self.collected) > self.submit_after:
self.submit_collected()
time.sleep(1)
time.sleep(sleeptime)
if __name__ == '__main__':