diff --git a/ppf.py b/ppf.py index 825d277..c87bf48 100755 --- a/ppf.py +++ b/ppf.py @@ -176,7 +176,7 @@ if __name__ == '__main__': # start proxy watcher if config.watchd_threads > 0: watcherd = proxywatchd.Proxywatchd() - watcherd.run_background() + watcherd.start() else: watcherd = None @@ -191,11 +191,9 @@ if __name__ == '__main__': else: time.sleep(10) except KeyboardInterrupt: - print "XXXXXX" - if watcherd: watcherd.stop() + if watcherd: + watcherd.stop() + watcherd.finish() break print '\r', - - # stop things - #if watcherd: watcherd.stop() diff --git a/proxywatchd.py b/proxywatchd.py index 2485f60..31a29f1 100644 --- a/proxywatchd.py +++ b/proxywatchd.py @@ -11,6 +11,8 @@ import mysqlite from misc import _log import rocksock +_run_standalone = False + class WorkerJob(): def __init__(self, proxy, proto, failcount): self.proxy = proxy @@ -103,7 +105,6 @@ class WorkerThread(): return wd def start_thread(self): self.thread = threading.Thread(target=self.workloop) - self.thread.daemon = True self.thread.start() def workloop(self): while True: @@ -124,10 +125,26 @@ class Proxywatchd(): _log('Requesting proxywatchd to halt (%d thread(s))' % len([item for item in self.threads if True])) self.stopping.set() + def _cleanup(self): + for wt in self.threads: + wt.stop() + for wt in self.threads: + wt.term() + self.collect_work() + self.submit_collected() + self.mysqlite.close() + self.stopped.set() + + def finish(self): + if not self.in_background: self._cleanup() + while not self.stopped.is_set(): time.sleep(0.1) + def __init__(self): config.load() + self.in_background = False self.threads = [] self.stopping = threading.Event() + self.stopped = threading.Event() # create table if needed self.mysqlite = mysqlite.mysqlite(config.database, str) @@ -160,31 +177,36 @@ class Proxywatchd(): self.mysqlite.commit() self.collected = [] - def run_background(self): - t = threading.Thread(target=self.run) - t.daemon = True - t.start() + def start(self): + if config.watchd_threads == 1 and _run_standalone: + return self._run() + else: + return self._run_background() def run(self): + if self.in_background: + while 1: time.sleep(0.1) + + def _run_background(self): + self.in_background = True + t = threading.Thread(target=self._run) + t.start() + + def _run(self): _log('Starting proxywatchd..', 'notice') self.mysqlite = mysqlite.mysqlite(config.database, str) for i in range(config.watchd_threads): threadid = ''.join( [ random.choice(string.letters) for x in range(5) ] ) wt = WorkerThread(threadid) - if config.watchd_threads > 1: + if self.in_background: wt.start_thread() self.threads.append(wt) while True: if self.stopping.is_set(): - for wt in self.threads: - wt.stop() - for wt in self.threads: - wt.term() - self.collect_work() - self.submit_collected() + if self.in_background: self._cleanup() break if len(self.jobs) == 0: @@ -196,7 +218,7 @@ class Proxywatchd(): self.threads[tid].add_jobs(self.jobs[tid*jpt:tid*jpt+jpt]) self.jobs = [] - if config.watchd_threads == 1: # single_thread scenario + if not self.in_background: # single_thread scenario self.threads[0].workloop() self.collect_work() @@ -206,11 +228,18 @@ class Proxywatchd(): time.sleep(1) - self.mysqlite.close() if __name__ == '__main__': + _run_standalone = True + + config.load() + w = Proxywatchd() try: + w.start() w.run() except KeyboardInterrupt as e: + raise e + finally: w.stop() + w.finish()