From af8f82924fdcb841133c0265b2536a9409ef0396 Mon Sep 17 00:00:00 2001 From: rofl0r Date: Sat, 5 Jan 2019 17:13:39 +0000 Subject: [PATCH] fix logic so threads do an orderly shutdown basically the issue was that the main loop received the SIGINT and therefore broke out before reaching the parts of the code that care about bringing down the child threads. therefore there's now a finish() method that needs to be called after stop(). because sqlite dbs insists to be used from the thread that created the object, the DB cleanup operation are done from the thread that controls it. for standalone operation, in order to keep the main thread alive, an additional run() method is used. this is not necessary when used via ppf.py. --- ppf.py | 10 ++++----- proxywatchd.py | 57 +++++++++++++++++++++++++++++++++++++------------- 2 files changed, 47 insertions(+), 20 deletions(-) diff --git a/ppf.py b/ppf.py index 825d277..c87bf48 100755 --- a/ppf.py +++ b/ppf.py @@ -176,7 +176,7 @@ if __name__ == '__main__': # start proxy watcher if config.watchd_threads > 0: watcherd = proxywatchd.Proxywatchd() - watcherd.run_background() + watcherd.start() else: watcherd = None @@ -191,11 +191,9 @@ if __name__ == '__main__': else: time.sleep(10) except KeyboardInterrupt: - print "XXXXXX" - if watcherd: watcherd.stop() + if watcherd: + watcherd.stop() + watcherd.finish() break print '\r', - - # stop things - #if watcherd: watcherd.stop() diff --git a/proxywatchd.py b/proxywatchd.py index 2485f60..31a29f1 100644 --- a/proxywatchd.py +++ b/proxywatchd.py @@ -11,6 +11,8 @@ import mysqlite from misc import _log import rocksock +_run_standalone = False + class WorkerJob(): def __init__(self, proxy, proto, failcount): self.proxy = proxy @@ -103,7 +105,6 @@ class WorkerThread(): return wd def start_thread(self): self.thread = threading.Thread(target=self.workloop) - self.thread.daemon = True self.thread.start() def workloop(self): while True: @@ -124,10 +125,26 @@ class Proxywatchd(): _log('Requesting proxywatchd to halt (%d thread(s))' % len([item for item in self.threads if True])) self.stopping.set() + def _cleanup(self): + for wt in self.threads: + wt.stop() + for wt in self.threads: + wt.term() + self.collect_work() + self.submit_collected() + self.mysqlite.close() + self.stopped.set() + + def finish(self): + if not self.in_background: self._cleanup() + while not self.stopped.is_set(): time.sleep(0.1) + def __init__(self): config.load() + self.in_background = False self.threads = [] self.stopping = threading.Event() + self.stopped = threading.Event() # create table if needed self.mysqlite = mysqlite.mysqlite(config.database, str) @@ -160,31 +177,36 @@ class Proxywatchd(): self.mysqlite.commit() self.collected = [] - def run_background(self): - t = threading.Thread(target=self.run) - t.daemon = True - t.start() + def start(self): + if config.watchd_threads == 1 and _run_standalone: + return self._run() + else: + return self._run_background() def run(self): + if self.in_background: + while 1: time.sleep(0.1) + + def _run_background(self): + self.in_background = True + t = threading.Thread(target=self._run) + t.start() + + def _run(self): _log('Starting proxywatchd..', 'notice') self.mysqlite = mysqlite.mysqlite(config.database, str) for i in range(config.watchd_threads): threadid = ''.join( [ random.choice(string.letters) for x in range(5) ] ) wt = WorkerThread(threadid) - if config.watchd_threads > 1: + if self.in_background: wt.start_thread() self.threads.append(wt) while True: if self.stopping.is_set(): - for wt in self.threads: - wt.stop() - for wt in self.threads: - wt.term() - self.collect_work() - self.submit_collected() + if self.in_background: self._cleanup() break if len(self.jobs) == 0: @@ -196,7 +218,7 @@ class Proxywatchd(): self.threads[tid].add_jobs(self.jobs[tid*jpt:tid*jpt+jpt]) self.jobs = [] - if config.watchd_threads == 1: # single_thread scenario + if not self.in_background: # single_thread scenario self.threads[0].workloop() self.collect_work() @@ -206,11 +228,18 @@ class Proxywatchd(): time.sleep(1) - self.mysqlite.close() if __name__ == '__main__': + _run_standalone = True + + config.load() + w = Proxywatchd() try: + w.start() w.run() except KeyboardInterrupt as e: + raise e + finally: w.stop() + w.finish()