From 05294186d44a98793f4e8d926d455d4847d88879 Mon Sep 17 00:00:00 2001 From: rofl0r Date: Sat, 5 Jan 2019 03:41:42 +0000 Subject: [PATCH 1/9] config sample: remove all unused items --- config.ini.sample | 6 ------ 1 file changed, 6 deletions(-) diff --git a/config.ini.sample b/config.ini.sample index f675586..b7c9a02 100644 --- a/config.ini.sample +++ b/config.ini.sample @@ -4,17 +4,11 @@ database = proxylist.sqlite proxy_max_fail = 5 [watcherd] -enabled = true proxy_file = false -checktime = 1800 threads = 10 timeout = 15 -read_timeout = 20 -max_fail = 5 [proxyfind] -enabled = true search = true -maxfail = 10 timeout = 30 threads = 3 From ffbe450aee43ca592d68f0e146976ced22314662 Mon Sep 17 00:00:00 2001 From: rofl0r Date: Sat, 5 Jan 2019 03:47:03 +0000 Subject: [PATCH 2/9] outsource configuration to external module --- config.py | 21 +++++++++++++++++++++ ppf.py | 22 ++++++++-------------- proxywatchd.py | 34 ++++++++++++---------------------- 3 files changed, 41 insertions(+), 36 deletions(-) create mode 100644 config.py diff --git a/config.py b/config.py new file mode 100644 index 0000000..61c921b --- /dev/null +++ b/config.py @@ -0,0 +1,21 @@ +from ConfigParser import SafeConfigParser + +_loaded = False + +def load(): + if _loaded: return + global database, maxfail, search, torhosts, watchd_threads, checktime, timeout, read_timeout + + ## read the config files + parser = SafeConfigParser() + parser.read('config.ini') + + database = parser.get('global', 'database') + maxfail = parser.getint('global', 'proxy_max_fail') + + search = parser.getboolean('proxyfind', 'search') + + torhosts = [ str(i).strip() for i in parser.get('global', 'tor_host').split(',') ] + watchd_threads = parser.getint('watcherd', 'threads') + timeout = parser.getint('watcherd', 'timeout') + diff --git a/ppf.py b/ppf.py index a91a4de..59deb86 100755 --- a/ppf.py +++ b/ppf.py @@ -6,13 +6,13 @@ import random, time import re import urllib import hashlib -from ConfigParser import SafeConfigParser from requests.packages.urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) import mysqlite import proxywatchd from misc import _log from soup_parser import soupify +import config base_header = { 'Accept-Language':'en-US,en;q=0.8', @@ -24,7 +24,6 @@ base_header = { searx_instances = ('https://searx.me', 'https://searx.xyz', 'https://searx.site', 'https://searx.win', 'https://searx.ru', 'https://stemy.me/searx', 'https://searx.at', 'https://listi.me', 'https://searx.dk', 'https://searx.laquadrature.net' ) retry_messages = ('Engines cannot retrieve results', 'Rate limit exceeded') -CONFIG = 'config.ini' def cleanhtml(raw_html): cleanr = re.compile('<.*?>') @@ -69,7 +68,7 @@ def insert_proxies(proxies, uri, sqlite): def proxyfind(sqlite = None): #print('entering proxyfind...') - if not sqlite: sqlite = mysqlite.mysqlite(database,str) + if not sqlite: sqlite = mysqlite.mysqlite(config.database,str) uris = [ i[0] for i in sqlite.execute('SELECT url FROM uris WHERE error=0 and url not like "%github%" ORDER BY RANDOM() LIMIT 10').fetchall() ] @@ -149,16 +148,11 @@ def proxyleech(sqlite, rows): if __name__ == '__main__': - ## read the config files - parser = SafeConfigParser() - parser.read(CONFIG) + config.load() + print repr(config.torhosts) + proxies={'http':'socks4://%s' % random.choice(config.torhosts),'https':'socks4://%s' % random.choice(config.torhosts)} - database = parser.get('global', 'database') - search = parser.getboolean('proxyfind', 'search') - tor_hosts = parser.get('global', 'tor_host').split(',') - proxies={'http':'socks4://%s' % random.choice(tor_hosts),'https':'socks4://%s' % random.choice(tor_hosts)} - - sqlite = mysqlite.mysqlite(database, str) + sqlite = mysqlite.mysqlite(config.database, str) ## create dbs if required sqlite.execute('CREATE TABLE IF NOT EXISTS uris (added INT, url TEXT, check_time INT, error INT, driver INT, hash TEXT)') @@ -178,7 +172,7 @@ if __name__ == '__main__': empty = [ urignore.append(i.split('/')[2]) for i in searx_instances ] # start proxy watcher - watcherd = proxywatchd.Proxywatchd(CONFIG) if parser.getboolean('watcherd', 'enabled') else None + watcherd = proxywatchd.Proxywatchd() if config.watchd_threads > 0 else None while True: try: @@ -186,7 +180,7 @@ if __name__ == '__main__': rows = [ [i[0],i[1],i[2]] for i in sqlite.execute('SELECT url,hash,error FROM uris WHERE (check_time= 180: - _log('Proxywatchd threads: %d/%d' % (len(threads), self.maxthreads)) + _log('Proxywatchd threads: %d/%d' % (len(threads), config.watchd_threads)) self.echoise = time.time() self.mysqlite.close() def is_drone_bl(self, proxy): p = proxy.split(':')[0] - proxies = {'http':'socks4://%s:%s@%s' % (p,p,random.choice(self.torhosts))} + proxies = {'http':'socks4://%s:%s@%s' % (p,p,random.choice(config.torhosts))} resp = requests.get('http://dronebl.org/lookup?ip=%s' % p, proxies=proxies) if 'No incidents regarding' in resp.text: return 0 else: return 1 @@ -78,7 +68,7 @@ class Proxywatchd(Thread): protos = ['http', 'socks5', 'socks4'] if proto is None else proto for proto in protos: - torhost = random.choice(self.torhosts) + torhost = random.choice(config.torhosts) duration = time.time() proxies = [ rocksock.RocksockProxyFromURL('socks4://%s' % torhost), rocksock.RocksockProxyFromURL('%s://%s' % (proto, proxy[0])), @@ -86,7 +76,7 @@ class Proxywatchd(Thread): srv = random.choice(servers).strip() try: - sock = rocksock.Rocksock(host=srv, port=6697, ssl=True, proxies=proxies, timeout=self.timeout) + sock = rocksock.Rocksock(host=srv, port=6697, ssl=True, proxies=proxies, timeout=config.timeout) sock.connect() sock.send('%s\n' % random.choice(['NICK', 'USER', 'JOIN', 'MODE', 'PART', 'INVITE', 'KNOCK', 'WHOIS', 'WHO', 'NOTICE', 'PRIVMSG', 'PING', 'QUIT'])) return sock, proto, duration, torhost, srv @@ -96,14 +86,14 @@ class Proxywatchd(Thread): return None, None, None, None, None def daemon(self, servers): - sqlite = mysqlite.mysqlite(self.database, str) + sqlite = mysqlite.mysqlite(config.database, str) threadid = ''.join( [ random.choice(string.letters) for x in range(5) ] ) q = 'SELECT proxy,failed,country,proto FROM proxylist WHERE failed Date: Sat, 5 Jan 2019 04:41:25 +0000 Subject: [PATCH 3/9] allow some config options optionally to be set by command line --- config.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/config.py b/config.py index 61c921b..3204a09 100644 --- a/config.py +++ b/config.py @@ -19,3 +19,10 @@ def load(): watchd_threads = parser.getint('watcherd', 'threads') timeout = parser.getint('watcherd', 'timeout') + # allow overriding select items from the commandline + import argparse + aparse = argparse.ArgumentParser() + aparse.add_argument('--watchd_threads', help="how many proxy checker threads to spin up, 0==none, default: 10", type=int, default=watchd_threads, required=False) + args = aparse.parse_args() + + watchd_threads = args.watchd_threads From 9ac3ed45d6a9a68cdee79f4dd8f5c72e2aada6e7 Mon Sep 17 00:00:00 2001 From: rofl0r Date: Sat, 5 Jan 2019 06:35:41 +0000 Subject: [PATCH 4/9] rewrite threading code in jobwatchd now it distributes the tasks properly among all threads, and it can be used as a standalone program. there are some minor performance issues which will be fixed shortly. --- config.py | 4 + ppf.py | 14 ++- proxywatchd.py | 275 +++++++++++++++++++++++++++++++------------------ 3 files changed, 188 insertions(+), 105 deletions(-) diff --git a/config.py b/config.py index 3204a09..421c064 100644 --- a/config.py +++ b/config.py @@ -26,3 +26,7 @@ def load(): args = aparse.parse_args() watchd_threads = args.watchd_threads + + global servers + with open('servers.txt', 'r') as handle: + servers = handle.read().split('\n') diff --git a/ppf.py b/ppf.py index 59deb86..a7e308e 100755 --- a/ppf.py +++ b/ppf.py @@ -149,7 +149,6 @@ def proxyleech(sqlite, rows): if __name__ == '__main__': config.load() - print repr(config.torhosts) proxies={'http':'socks4://%s' % random.choice(config.torhosts),'https':'socks4://%s' % random.choice(config.torhosts)} sqlite = mysqlite.mysqlite(config.database, str) @@ -172,7 +171,11 @@ if __name__ == '__main__': empty = [ urignore.append(i.split('/')[2]) for i in searx_instances ] # start proxy watcher - watcherd = proxywatchd.Proxywatchd() if config.watchd_threads > 0 else None + if config.watchd_threads > 0: + watcherd = proxywatchd.Proxywatchd() + watcherd.run_background() + else: + watcherd = None while True: try: @@ -184,9 +187,12 @@ if __name__ == '__main__': ## sleep else: time.sleep(10) - except KeyboardInterrupt: break + except KeyboardInterrupt: + print "XXXXXX" + if watcherd: watcherd.stop() + break print '\r', # stop things - if watcherd: watcherd.stop() + #if watcherd: watcherd.stop() diff --git a/proxywatchd.py b/proxywatchd.py index 1af99e0..ae4aef7 100644 --- a/proxywatchd.py +++ b/proxywatchd.py @@ -1,8 +1,7 @@ #!/usr/bin/env python -from threading import Thread -import threading, commands -import socket, time, random, sys, string, re +import threading +import time, random, string, re, copy import requests #from geoip import geolite2 @@ -12,50 +11,13 @@ import mysqlite from misc import _log import rocksock -class Proxywatchd(Thread): - - def stop(self): - _log('Requesting proxywatchd to halt (%d thread(s))' % len([item for item in self.threads if item.isAlive()])) - self.running = 0 - - def __init__(self): - Thread.__init__(self) - config.load() - self.threads = [] - self.running = 1 - - # create table if needed - self.mysqlite = mysqlite.mysqlite(config.database, str) - self.mysqlite.execute('CREATE TABLE IF NOT EXISTS proxylist (proxy BLOB, country BLOB, added INT, failed INT, tested INT, source BLOB, dronebl INT, proto TEXT, duration INT)') - self.mysqlite.commit() - self.echoise = time.time() - 3600; - self.ticks = time.time() - 3600; - - with open('servers.txt', 'r') as handle: self.servers = handle.read().split('\n') - - self.start() - - def run(self): - _log('Starting proxywatchd..', 'notice') - - threads = [] - self.mysqlite = mysqlite.mysqlite(config.database, str) - - while self.running: - - if len(threads) < config.watchd_threads: - t = threading.Thread(target=self.daemon, args=(self.servers,)) - t.start() - threads.append(t) - time.sleep( random.choice( xrange(1,3))) - - else: time.sleep(1) - - if (time.time() - self.echoise) >= 180: - _log('Proxywatchd threads: %d/%d' % (len(threads), config.watchd_threads)) - self.echoise = time.time() - - self.mysqlite.close() +class WorkerJob(): + def __init__(self, proxy, proto, failcount): + self.proxy = proxy + self.proto = proto + self.failcount = failcount + self.nextcheck = None + self.duration = None def is_drone_bl(self, proxy): p = proxy.split(':')[0] @@ -64,85 +26,196 @@ class Proxywatchd(Thread): if 'No incidents regarding' in resp.text: return 0 else: return 1 - def connect_socket(self, proxy, servers, proto = None): - protos = ['http', 'socks5', 'socks4'] if proto is None else proto + def connect_socket(self): + protos = ['http', 'socks5', 'socks4'] if self.proto is None else self.proto for proto in protos: torhost = random.choice(config.torhosts) duration = time.time() - proxies = [ rocksock.RocksockProxyFromURL('socks4://%s' % torhost), - rocksock.RocksockProxyFromURL('%s://%s' % (proto, proxy[0])), - ] + proxies = [ + rocksock.RocksockProxyFromURL('socks4://%s' % torhost), + rocksock.RocksockProxyFromURL('%s://%s' % (proto, self.proxy)), + ] - srv = random.choice(servers).strip() + srv = random.choice(config.servers).strip() try: sock = rocksock.Rocksock(host=srv, port=6697, ssl=True, proxies=proxies, timeout=config.timeout) sock.connect() sock.send('%s\n' % random.choice(['NICK', 'USER', 'JOIN', 'MODE', 'PART', 'INVITE', 'KNOCK', 'WHOIS', 'WHO', 'NOTICE', 'PRIVMSG', 'PING', 'QUIT'])) return sock, proto, duration, torhost, srv - + except KeyboardInterrupt as e: + raise(e) except: sock.disconnect() return None, None, None, None, None - def daemon(self, servers): - sqlite = mysqlite.mysqlite(config.database, str) - threadid = ''.join( [ random.choice(string.letters) for x in range(5) ] ) + def run(self): + self.nextcheck = (time.time() + 1800 + ((1+int(self.failcount)) * 3600)) - q = 'SELECT proxy,failed,country,proto FROM proxylist WHERE failed 1: + wt.start_thread() + self.threads.append(wt) + + while True: + + if self.stopping.is_set(): + for wt in self.threads: + wt.stop() + for wt in self.threads: + wt.term() + self.collect_work() + self.submit_collected() + break + + if len(self.jobs) == 0: + self.prepare_jobs() + while len(self.jobs): + tid = self.find_best_thread() + self.threads[tid].add_jobs([self.jobs.pop()]) + + if config.watchd_threads == 1: # single_thread scenario + self.threads[0].workloop() + + self.collect_work() + + if len(self.collected) > 50: + self.submit_collected() + + time.sleep(1) + + self.mysqlite.close() + +if __name__ == '__main__': + w = Proxywatchd() + try: + w.run() + except KeyboardInterrupt as e: + w.stop() From f45cd1190cd8d49c75610ad051e4df9150375bb2 Mon Sep 17 00:00:00 2001 From: rofl0r Date: Sat, 5 Jan 2019 06:58:46 +0000 Subject: [PATCH 5/9] fix performance issue in proxywatchd --- proxywatchd.py | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/proxywatchd.py b/proxywatchd.py index ae4aef7..7562657 100644 --- a/proxywatchd.py +++ b/proxywatchd.py @@ -158,16 +158,6 @@ class Proxywatchd(): self.mysqlite.execute(query, (job.failcount, job.nextcheck, 1, "unknown", job.proto, job.duration, job.proxy)) self.mysqlite.commit() - def find_best_thread(self): - least_jobs = 9999999999999999999 - least_tid = 0 - for i in range(len(self.threads)): - cnt = self.threads[i].jobcount() - if cnt < least_jobs: - least_jobs = cnt - least_tid = i - return least_tid - def run_background(self): t = threading.Thread(target=self.run) t.daemon = True @@ -197,9 +187,12 @@ class Proxywatchd(): if len(self.jobs) == 0: self.prepare_jobs() - while len(self.jobs): - tid = self.find_best_thread() - self.threads[tid].add_jobs([self.jobs.pop()]) + if len(self.jobs): + jpt = len(self.jobs)/config.watchd_threads + if len(self.jobs)/float(config.watchd_threads) - jpt > 0.0: jpt += 1 + for tid in range(config.watchd_threads): + self.threads[tid].add_jobs(self.jobs[tid*jpt:tid*jpt+jpt]) + self.jobs = [] if config.watchd_threads == 1: # single_thread scenario self.threads[0].workloop() From 42f35855ea8ff7f40d78c1560d600e1b3695f899 Mon Sep 17 00:00:00 2001 From: rofl0r Date: Sat, 5 Jan 2019 16:39:55 +0000 Subject: [PATCH 6/9] use a variable for the min number of jobs to collect this variable determines after how many results the db is written to. with a low value, it is written often and could more lock issues if 2 threads concur for the db. --- proxywatchd.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/proxywatchd.py b/proxywatchd.py index 7562657..a8145a9 100644 --- a/proxywatchd.py +++ b/proxywatchd.py @@ -136,6 +136,7 @@ class Proxywatchd(): self.mysqlite.close() self.mysqlite = None + self.submit_after = 200 # number of collected jobs before writing db self.echoise = time.time() - 3600; self.ticks = time.time() - 3600; self.jobs = [] @@ -199,7 +200,7 @@ class Proxywatchd(): self.collect_work() - if len(self.collected) > 50: + if len(self.collected) > self.submit_after: self.submit_collected() time.sleep(1) From 47221bae94362f1a40540cd51d54e91652ec9fca Mon Sep 17 00:00:00 2001 From: rofl0r Date: Sat, 5 Jan 2019 16:44:31 +0000 Subject: [PATCH 7/9] properly clean submitted job list after db write --- proxywatchd.py | 1 + 1 file changed, 1 insertion(+) diff --git a/proxywatchd.py b/proxywatchd.py index a8145a9..2485f60 100644 --- a/proxywatchd.py +++ b/proxywatchd.py @@ -158,6 +158,7 @@ class Proxywatchd(): for job in self.collected: self.mysqlite.execute(query, (job.failcount, job.nextcheck, 1, "unknown", job.proto, job.duration, job.proxy)) self.mysqlite.commit() + self.collected = [] def run_background(self): t = threading.Thread(target=self.run) From bb3da7122e5928276e95bc2a7e41da19645de086 Mon Sep 17 00:00:00 2001 From: rofl0r Date: Sat, 5 Jan 2019 17:11:08 +0000 Subject: [PATCH 8/9] ppf: properly reraise keyboard interrupts --- ppf.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ppf.py b/ppf.py index a7e308e..825d277 100755 --- a/ppf.py +++ b/ppf.py @@ -44,6 +44,7 @@ def import_from_file(fn, sqlite): def fetch_contents(uri): headers = base_header try: resp = requests.get(uri, timeout=45, headers=headers, verify=False, proxies=proxies) + except KeyboardInterrupt as e: raise e except: return '' data = resp.text @@ -112,6 +113,7 @@ def proxyleech(sqlite, rows): for row in rows: try: content = fetch_contents(row[0]) + except KeyboardInterrupt as e: raise e except: content = '' uniques = [] @@ -119,6 +121,7 @@ def proxyleech(sqlite, rows): if p in uniques: continue try: if not is_reserved_ipv4(p.split(':')[0]): uniques.append(p) + except KeyboardInterrupt as e: raise e except: pass From af8f82924fdcb841133c0265b2536a9409ef0396 Mon Sep 17 00:00:00 2001 From: rofl0r Date: Sat, 5 Jan 2019 17:13:39 +0000 Subject: [PATCH 9/9] fix logic so threads do an orderly shutdown basically the issue was that the main loop received the SIGINT and therefore broke out before reaching the parts of the code that care about bringing down the child threads. therefore there's now a finish() method that needs to be called after stop(). because sqlite dbs insists to be used from the thread that created the object, the DB cleanup operation are done from the thread that controls it. for standalone operation, in order to keep the main thread alive, an additional run() method is used. this is not necessary when used via ppf.py. --- ppf.py | 10 ++++----- proxywatchd.py | 57 +++++++++++++++++++++++++++++++++++++------------- 2 files changed, 47 insertions(+), 20 deletions(-) diff --git a/ppf.py b/ppf.py index 825d277..c87bf48 100755 --- a/ppf.py +++ b/ppf.py @@ -176,7 +176,7 @@ if __name__ == '__main__': # start proxy watcher if config.watchd_threads > 0: watcherd = proxywatchd.Proxywatchd() - watcherd.run_background() + watcherd.start() else: watcherd = None @@ -191,11 +191,9 @@ if __name__ == '__main__': else: time.sleep(10) except KeyboardInterrupt: - print "XXXXXX" - if watcherd: watcherd.stop() + if watcherd: + watcherd.stop() + watcherd.finish() break print '\r', - - # stop things - #if watcherd: watcherd.stop() diff --git a/proxywatchd.py b/proxywatchd.py index 2485f60..31a29f1 100644 --- a/proxywatchd.py +++ b/proxywatchd.py @@ -11,6 +11,8 @@ import mysqlite from misc import _log import rocksock +_run_standalone = False + class WorkerJob(): def __init__(self, proxy, proto, failcount): self.proxy = proxy @@ -103,7 +105,6 @@ class WorkerThread(): return wd def start_thread(self): self.thread = threading.Thread(target=self.workloop) - self.thread.daemon = True self.thread.start() def workloop(self): while True: @@ -124,10 +125,26 @@ class Proxywatchd(): _log('Requesting proxywatchd to halt (%d thread(s))' % len([item for item in self.threads if True])) self.stopping.set() + def _cleanup(self): + for wt in self.threads: + wt.stop() + for wt in self.threads: + wt.term() + self.collect_work() + self.submit_collected() + self.mysqlite.close() + self.stopped.set() + + def finish(self): + if not self.in_background: self._cleanup() + while not self.stopped.is_set(): time.sleep(0.1) + def __init__(self): config.load() + self.in_background = False self.threads = [] self.stopping = threading.Event() + self.stopped = threading.Event() # create table if needed self.mysqlite = mysqlite.mysqlite(config.database, str) @@ -160,31 +177,36 @@ class Proxywatchd(): self.mysqlite.commit() self.collected = [] - def run_background(self): - t = threading.Thread(target=self.run) - t.daemon = True - t.start() + def start(self): + if config.watchd_threads == 1 and _run_standalone: + return self._run() + else: + return self._run_background() def run(self): + if self.in_background: + while 1: time.sleep(0.1) + + def _run_background(self): + self.in_background = True + t = threading.Thread(target=self._run) + t.start() + + def _run(self): _log('Starting proxywatchd..', 'notice') self.mysqlite = mysqlite.mysqlite(config.database, str) for i in range(config.watchd_threads): threadid = ''.join( [ random.choice(string.letters) for x in range(5) ] ) wt = WorkerThread(threadid) - if config.watchd_threads > 1: + if self.in_background: wt.start_thread() self.threads.append(wt) while True: if self.stopping.is_set(): - for wt in self.threads: - wt.stop() - for wt in self.threads: - wt.term() - self.collect_work() - self.submit_collected() + if self.in_background: self._cleanup() break if len(self.jobs) == 0: @@ -196,7 +218,7 @@ class Proxywatchd(): self.threads[tid].add_jobs(self.jobs[tid*jpt:tid*jpt+jpt]) self.jobs = [] - if config.watchd_threads == 1: # single_thread scenario + if not self.in_background: # single_thread scenario self.threads[0].workloop() self.collect_work() @@ -206,11 +228,18 @@ class Proxywatchd(): time.sleep(1) - self.mysqlite.close() if __name__ == '__main__': + _run_standalone = True + + config.load() + w = Proxywatchd() try: + w.start() w.run() except KeyboardInterrupt as e: + raise e + finally: w.stop() + w.finish()