From 5965312a9a082ea5c5f12ba6b8673d901b1a0637 Mon Sep 17 00:00:00 2001 From: Your Name Date: Sat, 6 Feb 2021 14:30:07 +0100 Subject: [PATCH] make leeching multithreaded, misc changes --- ppf.py | 113 +++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 106 insertions(+), 7 deletions(-) diff --git a/ppf.py b/ppf.py index ef6f01d..691f20f 100755 --- a/ppf.py +++ b/ppf.py @@ -11,8 +11,10 @@ import sys from bs4 import BeautifulSoup import re import threading +import random config = Config() +_known_proxies = {} def import_from_file(fn, sqlite): with open(fn, 'r') as f: @@ -53,13 +55,14 @@ def proxyleech(proxydb, urldb, url, stale_count, error, retrievals, proxies_adde if len(new) == 0: stale_count += 1 else: - extract_urls(content, url) stale_count = 0 if content == '': error += 1 else: retrievals += 1 error = 0 + if unique_count: + extract_urls(content, url) urldb.execute('UPDATE uris SET error=?,stale_count=?,check_time=?,retrievals=?,proxies_added=?,content_type=? where url=?', (error, stale_count, int(time.time()), retrievals, proxies_added+len(new), content_type, url)) urldb.commit() @@ -68,6 +71,7 @@ def proxyleech(proxydb, urldb, url, stale_count, error, retrievals, proxies_adde dbs.insert_proxies(proxydb, new, url) + def is_bad_url(uri, domain=None, samedomain=False): # if uri needs to be from same domain and domains missmatch if samedomain and str(uri.split('/')[2]).lower() != str(domain).lower(): @@ -201,12 +205,88 @@ def start_server(ip, port): t.start() return t, done +def extract_proxies(content): + matches = re.findall(r'([0-9]+(?:\.[0-9]+){3}:[0-9]{2,5})[\D$]', fetch.cleanhtml(content)) + uniques_dict = {} + for p in matches: + uniques_dict[p] = True + + uniques = [] + for p in uniques_dict.keys(): + if fetch.is_usable_proxy(p): uniques.append(p) + + return uniques + + +class Leechered(threading.Thread): + #def __init__(self, proxydb, urldb, url, stale_count, error, retrievals, proxies_added, content_type): + def __init__(self, url, stale_count, error, retrievals, proxies_added, content_type): + self.status = 'nok' + self.proxylist = [] + self.running = True + self.url = url + self.stale_count = stale_count + self.error = error + self.retrievals = retrievals + self.proxies_added = proxies_added + self.content_type = content_type + self.execute = '' + threading.Thread.__init__(self) + + def retrieve(self): + return self.url, self.proxylist, self.stale_count, self.error, self.retrievals, self.content_type, self.execute + def status(self): + return self.status + + def run(self): + self.status = 'nok' + + if not self.content_type: self.content_type = get_content_type(url) + + if is_good_content_type(self.content_type): + try: content = fetch.fetch_contents(self.url) + except KeyboardInterrupt as e: raise e + except: content = '' + else: + content = '' + + unique = extract_proxies(content) + self.proxylist = [ proxy for proxy in unique if not proxy in _known_proxies ] + proxy_count = len(self.proxylist) + + if self.retrievals == 0: # new site + if content != '' and proxy_count == 0: # site works but has zero proxy addresses + #error = 99999 + self.error += 2 + else: + if len(self.proxylist) == 0: + self.stale_count += 1 + else: + self.stale_count = 0 + if content == '': + self.error += 1 + else: + self.retrievals += 1 + self.error = 0 + if proxy_count: + extract_urls(content, self.url) + + self.execute = (self.error, self.stale_count, int(time.time()), self.retrievals, self.proxies_added+len(self.proxylist), self.content_type, self.url) + self.status = 'ok' + + if __name__ == '__main__': config.load() fetch.set_config(config) + proxydb = mysqlite.mysqlite(config.watchd.database, str) dbs.create_table_if_not_exists(proxydb, 'proxylist') + #global _known_proxies + #if len(_known_proxies) == 0: + known = proxydb.execute('SELECT proxy FROM proxylist').fetchall() + for k in known: + _known_proxies[k[0]] = True with open('urignore.txt', 'r') as f: urignore = [ i.strip() for i in f.read().split('\n') if len(i.strip()) ] @@ -224,17 +304,36 @@ if __name__ == '__main__': else: watcherd = None - start_server(config.httpd.listenip, config.httpd.port) + #start_server(config.httpd.listenip, config.httpd.port) + + qurl = 'SELECT url,stale_count,error,retrievals,proxies_added,content_type FROM uris WHERE error < ? and (check_time+?+((error+stale_count)*?) 3: + rows = urldb.execute(qurl, (config.ppf.max_fail, config.ppf.checktime, config.ppf.perfail_checktime, int(time.time()))).fetchall() + reqtime = time.time() + if len(rows) < 5: + rows = [] - if not len(rows): time.sleep(10) + for thread in threads: + if thread.status == 'ok': + url, proxylist, stale_count, error, retrievals, content_type, execute = thread.retrieve() + urldb.execute('UPDATE uris SET error=?,stale_count=?,check_time=?,retrievals=?,proxies_added=?,content_type=? where url=?', execute) + if len(proxylist): dbs.insert_proxies(proxydb, proxylist, url) - for row in rows: - proxyleech(proxydb, urldb, row[0], row[1], row[2], row[3], row[4], row[5]) + threads = [ thread for thread in threads if thread.is_alive() ] + if len(threads) < 5 and len(rows): + row = random.choice(rows) + rows.remove(row) + t = Leechered(row[0], row[1], row[2], row[3], row[4], row[5]) + threads.append(t) + t.start() except KeyboardInterrupt: if watcherd: