fix logic so threads do an orderly shutdown

basically the issue was that the main loop received the SIGINT
and therefore broke out before reaching the parts of the code
that care about bringing down the child threads.

therefore there's now a finish() method that needs to be called
after stop().

because sqlite dbs insists to be used from the thread that created
the object, the DB cleanup operation are done from the thread
that controls it.

for standalone operation, in order to keep the main thread alive,
an additional run() method is used. this is not necessary when
used via ppf.py.
This commit is contained in:
rofl0r
2019-01-05 17:13:39 +00:00
parent bb3da7122e
commit af8f82924f
2 changed files with 47 additions and 20 deletions

10
ppf.py
View File

@@ -176,7 +176,7 @@ if __name__ == '__main__':
# start proxy watcher # start proxy watcher
if config.watchd_threads > 0: if config.watchd_threads > 0:
watcherd = proxywatchd.Proxywatchd() watcherd = proxywatchd.Proxywatchd()
watcherd.run_background() watcherd.start()
else: else:
watcherd = None watcherd = None
@@ -191,11 +191,9 @@ if __name__ == '__main__':
else: time.sleep(10) else: time.sleep(10)
except KeyboardInterrupt: except KeyboardInterrupt:
print "XXXXXX" if watcherd:
if watcherd: watcherd.stop() watcherd.stop()
watcherd.finish()
break break
print '\r', print '\r',
# stop things
#if watcherd: watcherd.stop()

View File

@@ -11,6 +11,8 @@ import mysqlite
from misc import _log from misc import _log
import rocksock import rocksock
_run_standalone = False
class WorkerJob(): class WorkerJob():
def __init__(self, proxy, proto, failcount): def __init__(self, proxy, proto, failcount):
self.proxy = proxy self.proxy = proxy
@@ -103,7 +105,6 @@ class WorkerThread():
return wd return wd
def start_thread(self): def start_thread(self):
self.thread = threading.Thread(target=self.workloop) self.thread = threading.Thread(target=self.workloop)
self.thread.daemon = True
self.thread.start() self.thread.start()
def workloop(self): def workloop(self):
while True: while True:
@@ -124,10 +125,26 @@ class Proxywatchd():
_log('Requesting proxywatchd to halt (%d thread(s))' % len([item for item in self.threads if True])) _log('Requesting proxywatchd to halt (%d thread(s))' % len([item for item in self.threads if True]))
self.stopping.set() self.stopping.set()
def _cleanup(self):
for wt in self.threads:
wt.stop()
for wt in self.threads:
wt.term()
self.collect_work()
self.submit_collected()
self.mysqlite.close()
self.stopped.set()
def finish(self):
if not self.in_background: self._cleanup()
while not self.stopped.is_set(): time.sleep(0.1)
def __init__(self): def __init__(self):
config.load() config.load()
self.in_background = False
self.threads = [] self.threads = []
self.stopping = threading.Event() self.stopping = threading.Event()
self.stopped = threading.Event()
# create table if needed # create table if needed
self.mysqlite = mysqlite.mysqlite(config.database, str) self.mysqlite = mysqlite.mysqlite(config.database, str)
@@ -160,31 +177,36 @@ class Proxywatchd():
self.mysqlite.commit() self.mysqlite.commit()
self.collected = [] self.collected = []
def run_background(self): def start(self):
t = threading.Thread(target=self.run) if config.watchd_threads == 1 and _run_standalone:
t.daemon = True return self._run()
t.start() else:
return self._run_background()
def run(self): def run(self):
if self.in_background:
while 1: time.sleep(0.1)
def _run_background(self):
self.in_background = True
t = threading.Thread(target=self._run)
t.start()
def _run(self):
_log('Starting proxywatchd..', 'notice') _log('Starting proxywatchd..', 'notice')
self.mysqlite = mysqlite.mysqlite(config.database, str) self.mysqlite = mysqlite.mysqlite(config.database, str)
for i in range(config.watchd_threads): for i in range(config.watchd_threads):
threadid = ''.join( [ random.choice(string.letters) for x in range(5) ] ) threadid = ''.join( [ random.choice(string.letters) for x in range(5) ] )
wt = WorkerThread(threadid) wt = WorkerThread(threadid)
if config.watchd_threads > 1: if self.in_background:
wt.start_thread() wt.start_thread()
self.threads.append(wt) self.threads.append(wt)
while True: while True:
if self.stopping.is_set(): if self.stopping.is_set():
for wt in self.threads: if self.in_background: self._cleanup()
wt.stop()
for wt in self.threads:
wt.term()
self.collect_work()
self.submit_collected()
break break
if len(self.jobs) == 0: if len(self.jobs) == 0:
@@ -196,7 +218,7 @@ class Proxywatchd():
self.threads[tid].add_jobs(self.jobs[tid*jpt:tid*jpt+jpt]) self.threads[tid].add_jobs(self.jobs[tid*jpt:tid*jpt+jpt])
self.jobs = [] self.jobs = []
if config.watchd_threads == 1: # single_thread scenario if not self.in_background: # single_thread scenario
self.threads[0].workloop() self.threads[0].workloop()
self.collect_work() self.collect_work()
@@ -206,11 +228,18 @@ class Proxywatchd():
time.sleep(1) time.sleep(1)
self.mysqlite.close()
if __name__ == '__main__': if __name__ == '__main__':
_run_standalone = True
config.load()
w = Proxywatchd() w = Proxywatchd()
try: try:
w.start()
w.run() w.run()
except KeyboardInterrupt as e: except KeyboardInterrupt as e:
raise e
finally:
w.stop() w.stop()
w.finish()