rewrite threading code in jobwatchd
now it distributes the tasks properly among all threads, and it can be used as a standalone program. there are some minor performance issues which will be fixed shortly.
This commit is contained in:
@@ -26,3 +26,7 @@ def load():
|
|||||||
args = aparse.parse_args()
|
args = aparse.parse_args()
|
||||||
|
|
||||||
watchd_threads = args.watchd_threads
|
watchd_threads = args.watchd_threads
|
||||||
|
|
||||||
|
global servers
|
||||||
|
with open('servers.txt', 'r') as handle:
|
||||||
|
servers = handle.read().split('\n')
|
||||||
|
|||||||
14
ppf.py
14
ppf.py
@@ -149,7 +149,6 @@ def proxyleech(sqlite, rows):
|
|||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
config.load()
|
config.load()
|
||||||
print repr(config.torhosts)
|
|
||||||
proxies={'http':'socks4://%s' % random.choice(config.torhosts),'https':'socks4://%s' % random.choice(config.torhosts)}
|
proxies={'http':'socks4://%s' % random.choice(config.torhosts),'https':'socks4://%s' % random.choice(config.torhosts)}
|
||||||
|
|
||||||
sqlite = mysqlite.mysqlite(config.database, str)
|
sqlite = mysqlite.mysqlite(config.database, str)
|
||||||
@@ -172,7 +171,11 @@ if __name__ == '__main__':
|
|||||||
empty = [ urignore.append(i.split('/')[2]) for i in searx_instances ]
|
empty = [ urignore.append(i.split('/')[2]) for i in searx_instances ]
|
||||||
|
|
||||||
# start proxy watcher
|
# start proxy watcher
|
||||||
watcherd = proxywatchd.Proxywatchd() if config.watchd_threads > 0 else None
|
if config.watchd_threads > 0:
|
||||||
|
watcherd = proxywatchd.Proxywatchd()
|
||||||
|
watcherd.run_background()
|
||||||
|
else:
|
||||||
|
watcherd = None
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
@@ -184,9 +187,12 @@ if __name__ == '__main__':
|
|||||||
## sleep
|
## sleep
|
||||||
else: time.sleep(10)
|
else: time.sleep(10)
|
||||||
|
|
||||||
except KeyboardInterrupt: break
|
except KeyboardInterrupt:
|
||||||
|
print "XXXXXX"
|
||||||
|
if watcherd: watcherd.stop()
|
||||||
|
break
|
||||||
|
|
||||||
print '\r',
|
print '\r',
|
||||||
|
|
||||||
# stop things
|
# stop things
|
||||||
if watcherd: watcherd.stop()
|
#if watcherd: watcherd.stop()
|
||||||
|
|||||||
275
proxywatchd.py
275
proxywatchd.py
@@ -1,8 +1,7 @@
|
|||||||
#!/usr/bin/env python
|
#!/usr/bin/env python
|
||||||
|
|
||||||
from threading import Thread
|
import threading
|
||||||
import threading, commands
|
import time, random, string, re, copy
|
||||||
import socket, time, random, sys, string, re
|
|
||||||
import requests
|
import requests
|
||||||
#from geoip import geolite2
|
#from geoip import geolite2
|
||||||
|
|
||||||
@@ -12,50 +11,13 @@ import mysqlite
|
|||||||
from misc import _log
|
from misc import _log
|
||||||
import rocksock
|
import rocksock
|
||||||
|
|
||||||
class Proxywatchd(Thread):
|
class WorkerJob():
|
||||||
|
def __init__(self, proxy, proto, failcount):
|
||||||
def stop(self):
|
self.proxy = proxy
|
||||||
_log('Requesting proxywatchd to halt (%d thread(s))' % len([item for item in self.threads if item.isAlive()]))
|
self.proto = proto
|
||||||
self.running = 0
|
self.failcount = failcount
|
||||||
|
self.nextcheck = None
|
||||||
def __init__(self):
|
self.duration = None
|
||||||
Thread.__init__(self)
|
|
||||||
config.load()
|
|
||||||
self.threads = []
|
|
||||||
self.running = 1
|
|
||||||
|
|
||||||
# create table if needed
|
|
||||||
self.mysqlite = mysqlite.mysqlite(config.database, str)
|
|
||||||
self.mysqlite.execute('CREATE TABLE IF NOT EXISTS proxylist (proxy BLOB, country BLOB, added INT, failed INT, tested INT, source BLOB, dronebl INT, proto TEXT, duration INT)')
|
|
||||||
self.mysqlite.commit()
|
|
||||||
self.echoise = time.time() - 3600;
|
|
||||||
self.ticks = time.time() - 3600;
|
|
||||||
|
|
||||||
with open('servers.txt', 'r') as handle: self.servers = handle.read().split('\n')
|
|
||||||
|
|
||||||
self.start()
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
_log('Starting proxywatchd..', 'notice')
|
|
||||||
|
|
||||||
threads = []
|
|
||||||
self.mysqlite = mysqlite.mysqlite(config.database, str)
|
|
||||||
|
|
||||||
while self.running:
|
|
||||||
|
|
||||||
if len(threads) < config.watchd_threads:
|
|
||||||
t = threading.Thread(target=self.daemon, args=(self.servers,))
|
|
||||||
t.start()
|
|
||||||
threads.append(t)
|
|
||||||
time.sleep( random.choice( xrange(1,3)))
|
|
||||||
|
|
||||||
else: time.sleep(1)
|
|
||||||
|
|
||||||
if (time.time() - self.echoise) >= 180:
|
|
||||||
_log('Proxywatchd threads: %d/%d' % (len(threads), config.watchd_threads))
|
|
||||||
self.echoise = time.time()
|
|
||||||
|
|
||||||
self.mysqlite.close()
|
|
||||||
|
|
||||||
def is_drone_bl(self, proxy):
|
def is_drone_bl(self, proxy):
|
||||||
p = proxy.split(':')[0]
|
p = proxy.split(':')[0]
|
||||||
@@ -64,85 +26,196 @@ class Proxywatchd(Thread):
|
|||||||
if 'No incidents regarding' in resp.text: return 0
|
if 'No incidents regarding' in resp.text: return 0
|
||||||
else: return 1
|
else: return 1
|
||||||
|
|
||||||
def connect_socket(self, proxy, servers, proto = None):
|
def connect_socket(self):
|
||||||
protos = ['http', 'socks5', 'socks4'] if proto is None else proto
|
protos = ['http', 'socks5', 'socks4'] if self.proto is None else self.proto
|
||||||
|
|
||||||
for proto in protos:
|
for proto in protos:
|
||||||
torhost = random.choice(config.torhosts)
|
torhost = random.choice(config.torhosts)
|
||||||
duration = time.time()
|
duration = time.time()
|
||||||
proxies = [ rocksock.RocksockProxyFromURL('socks4://%s' % torhost),
|
proxies = [
|
||||||
rocksock.RocksockProxyFromURL('%s://%s' % (proto, proxy[0])),
|
rocksock.RocksockProxyFromURL('socks4://%s' % torhost),
|
||||||
]
|
rocksock.RocksockProxyFromURL('%s://%s' % (proto, self.proxy)),
|
||||||
|
]
|
||||||
|
|
||||||
srv = random.choice(servers).strip()
|
srv = random.choice(config.servers).strip()
|
||||||
try:
|
try:
|
||||||
sock = rocksock.Rocksock(host=srv, port=6697, ssl=True, proxies=proxies, timeout=config.timeout)
|
sock = rocksock.Rocksock(host=srv, port=6697, ssl=True, proxies=proxies, timeout=config.timeout)
|
||||||
sock.connect()
|
sock.connect()
|
||||||
sock.send('%s\n' % random.choice(['NICK', 'USER', 'JOIN', 'MODE', 'PART', 'INVITE', 'KNOCK', 'WHOIS', 'WHO', 'NOTICE', 'PRIVMSG', 'PING', 'QUIT']))
|
sock.send('%s\n' % random.choice(['NICK', 'USER', 'JOIN', 'MODE', 'PART', 'INVITE', 'KNOCK', 'WHOIS', 'WHO', 'NOTICE', 'PRIVMSG', 'PING', 'QUIT']))
|
||||||
return sock, proto, duration, torhost, srv
|
return sock, proto, duration, torhost, srv
|
||||||
|
except KeyboardInterrupt as e:
|
||||||
|
raise(e)
|
||||||
except: sock.disconnect()
|
except: sock.disconnect()
|
||||||
|
|
||||||
return None, None, None, None, None
|
return None, None, None, None, None
|
||||||
|
|
||||||
def daemon(self, servers):
|
def run(self):
|
||||||
sqlite = mysqlite.mysqlite(config.database, str)
|
self.nextcheck = (time.time() + 1800 + ((1+int(self.failcount)) * 3600))
|
||||||
threadid = ''.join( [ random.choice(string.letters) for x in range(5) ] )
|
|
||||||
|
|
||||||
q = 'SELECT proxy,failed,country,proto FROM proxylist WHERE failed<? and tested<? ORDER BY RANDOM() LIMIT ?'
|
sock, proto, duration, tor, srv = self.connect_socket()
|
||||||
|
if not sock:
|
||||||
|
self.failcount += 1
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
recv = sock.recv(6)
|
||||||
|
|
||||||
while self.running:
|
# good data
|
||||||
sqlite_requests = []
|
if re.match('^(:|ERROR|PING|PONG|NOTICE|\*\*\*)', recv, re.IGNORECASE):
|
||||||
rows = sqlite.execute(q, (config.maxfail, time.time(), random.randint(10,20))).fetchall()
|
duration = (time.time() - duration)
|
||||||
if not len(rows):
|
self.nextcheck = (time.time() + 1800)
|
||||||
time.sleep(random.randint(10,20))
|
|
||||||
continue
|
|
||||||
|
|
||||||
abc = ' OR proxy='.join( [ '?' for x in xrange(0, len(rows)) ] )
|
#match = geolite2.lookup(proxy[0].split(':')[0])
|
||||||
args = [ (time.time() + 180) ]
|
match = None
|
||||||
e = [ args.append(i[0]) for i in rows ]
|
if match is not None: match = match.country
|
||||||
sqlite.executemany('UPDATE proxylist SET tested=? WHERE proxy=%s' % abc, (args,))
|
else: match = 'unknown'
|
||||||
sqlite.commit()
|
|
||||||
|
|
||||||
for proxy in rows:
|
#dronebl = self.is_drone_bl(proxy[0])
|
||||||
time.sleep(0.1)
|
self.proto = proto
|
||||||
nextcheck = (time.time() + 1800 + ((1+int(proxy[1])) * 3600))
|
self.duation = duration
|
||||||
|
self.failcount = 0
|
||||||
|
_log('%s://%s; c: %s; d: %d sec(s); tor: %s; srv: %s; recv: %s' % (proto, self.proxy, match, duration, tor, srv, recv), 'xxxxx')
|
||||||
|
except KeyboardInterrupt as e:
|
||||||
|
raise e
|
||||||
|
except:
|
||||||
|
self.failcount += 1
|
||||||
|
finally:
|
||||||
|
sock.disconnect()
|
||||||
|
|
||||||
sock, proto, duration, tor, srv = self.connect_socket(proxy, servers, proto=proxy[3])
|
|
||||||
if not sock:
|
|
||||||
sqlite_requests.append(((proxy[1]+1), nextcheck, 1, 'unknown', None, 0, proxy[0],))
|
|
||||||
continue
|
|
||||||
|
|
||||||
try:
|
class WorkerThread():
|
||||||
recv = sock.recv(6)
|
def __init__ (self, id):
|
||||||
|
self.id = id
|
||||||
|
self.done = threading.Event()
|
||||||
|
self.thread = None
|
||||||
|
self.workqueue = []
|
||||||
|
self.workdone = []
|
||||||
|
def stop(self):
|
||||||
|
self.done.set()
|
||||||
|
def term(self):
|
||||||
|
if self.thread: self.thread.join()
|
||||||
|
def add_jobs(self, jobs):
|
||||||
|
self.workqueue.extend(jobs)
|
||||||
|
def jobcount(self):
|
||||||
|
return len(self.workqueue)
|
||||||
|
def collect(self):
|
||||||
|
wd = copy.copy(self.workdone)
|
||||||
|
self.workdone = []
|
||||||
|
return wd
|
||||||
|
def start_thread(self):
|
||||||
|
self.thread = threading.Thread(target=self.workloop)
|
||||||
|
self.thread.daemon = True
|
||||||
|
self.thread.start()
|
||||||
|
def workloop(self):
|
||||||
|
while True:
|
||||||
|
if len(self.workqueue):
|
||||||
|
job = self.workqueue.pop()
|
||||||
|
job.run()
|
||||||
|
self.workdone.append(job)
|
||||||
|
elif not self.thread:
|
||||||
|
break
|
||||||
|
if self.done.is_set(): break
|
||||||
|
time.sleep(0.01)
|
||||||
|
if self.thread:
|
||||||
|
_log("thread %s terminated", self.id)
|
||||||
|
|
||||||
# good data
|
class Proxywatchd():
|
||||||
if re.match('^(:|ERROR|PING|PONG|NOTICE|\*\*\*)', recv, re.IGNORECASE):
|
|
||||||
duration = (time.time() - duration)
|
|
||||||
nextcheck = (time.time() + 1800)
|
|
||||||
|
|
||||||
#match = geolite2.lookup(proxy[0].split(':')[0])
|
def stop(self):
|
||||||
match = None
|
_log('Requesting proxywatchd to halt (%d thread(s))' % len([item for item in self.threads if True]))
|
||||||
if match is not None: match = match.country
|
self.stopping.set()
|
||||||
else: match = 'unknown'
|
|
||||||
|
|
||||||
#dronebl = self.is_drone_bl(proxy[0])
|
def __init__(self):
|
||||||
sqlite_requests.append( (0, nextcheck, 1, match, proto, duration, proxy[0],))
|
config.load()
|
||||||
_log('%s://%s; c: %s; d: %d sec(s); tor: %s; srv: %s; recv: %s' % (proto, proxy[0], match, duration, tor, srv, recv), threadid)
|
self.threads = []
|
||||||
|
self.stopping = threading.Event()
|
||||||
|
|
||||||
# bad data
|
# create table if needed
|
||||||
else:
|
self.mysqlite = mysqlite.mysqlite(config.database, str)
|
||||||
sqlite_requests.append(( (proxy[1]+1), nextcheck, 1, 'unknown', None, 0, proxy[0],))
|
self.mysqlite.execute('CREATE TABLE IF NOT EXISTS proxylist (proxy BLOB, country BLOB, added INT, failed INT, tested INT, source BLOB, dronebl INT, proto TEXT, duration INT)')
|
||||||
|
self.mysqlite.commit()
|
||||||
|
self.mysqlite.close()
|
||||||
|
self.mysqlite = None
|
||||||
|
|
||||||
# also bad
|
self.echoise = time.time() - 3600;
|
||||||
except:
|
self.ticks = time.time() - 3600;
|
||||||
sqlite_requests.append(( (proxy[1]+1), nextcheck, 1, 'unknown', None, 0, proxy[0],))
|
self.jobs = []
|
||||||
|
self.collected = []
|
||||||
|
|
||||||
finally:
|
def prepare_jobs(self):
|
||||||
sock.disconnect()
|
q = 'SELECT proxy,proto,failed FROM proxylist WHERE failed<? and tested<? ORDER BY RANDOM()' # ' LIMIT ?'
|
||||||
|
rows = self.mysqlite.execute(q, (config.maxfail, time.time())).fetchall()
|
||||||
|
for row in rows:
|
||||||
|
job = WorkerJob(row[0], row[1], row[2])
|
||||||
|
self.jobs.append(job)
|
||||||
|
|
||||||
for r in sqlite_requests:
|
def collect_work(self):
|
||||||
sqlite.execute('UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,duration=? WHERE proxy=?', r)
|
for wt in self.threads:
|
||||||
sqlite.commit()
|
self.collected.extend(wt.collect())
|
||||||
|
|
||||||
sqlite.close()
|
def submit_collected(self):
|
||||||
|
query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,duration=? WHERE proxy=?'
|
||||||
|
for job in self.collected:
|
||||||
|
self.mysqlite.execute(query, (job.failcount, job.nextcheck, 1, "unknown", job.proto, job.duration, job.proxy))
|
||||||
|
self.mysqlite.commit()
|
||||||
|
|
||||||
|
def find_best_thread(self):
|
||||||
|
least_jobs = 9999999999999999999
|
||||||
|
least_tid = 0
|
||||||
|
for i in range(len(self.threads)):
|
||||||
|
cnt = self.threads[i].jobcount()
|
||||||
|
if cnt < least_jobs:
|
||||||
|
least_jobs = cnt
|
||||||
|
least_tid = i
|
||||||
|
return least_tid
|
||||||
|
|
||||||
|
def run_background(self):
|
||||||
|
t = threading.Thread(target=self.run)
|
||||||
|
t.daemon = True
|
||||||
|
t.start()
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
_log('Starting proxywatchd..', 'notice')
|
||||||
|
self.mysqlite = mysqlite.mysqlite(config.database, str)
|
||||||
|
|
||||||
|
for i in range(config.watchd_threads):
|
||||||
|
threadid = ''.join( [ random.choice(string.letters) for x in range(5) ] )
|
||||||
|
wt = WorkerThread(threadid)
|
||||||
|
if config.watchd_threads > 1:
|
||||||
|
wt.start_thread()
|
||||||
|
self.threads.append(wt)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
|
||||||
|
if self.stopping.is_set():
|
||||||
|
for wt in self.threads:
|
||||||
|
wt.stop()
|
||||||
|
for wt in self.threads:
|
||||||
|
wt.term()
|
||||||
|
self.collect_work()
|
||||||
|
self.submit_collected()
|
||||||
|
break
|
||||||
|
|
||||||
|
if len(self.jobs) == 0:
|
||||||
|
self.prepare_jobs()
|
||||||
|
while len(self.jobs):
|
||||||
|
tid = self.find_best_thread()
|
||||||
|
self.threads[tid].add_jobs([self.jobs.pop()])
|
||||||
|
|
||||||
|
if config.watchd_threads == 1: # single_thread scenario
|
||||||
|
self.threads[0].workloop()
|
||||||
|
|
||||||
|
self.collect_work()
|
||||||
|
|
||||||
|
if len(self.collected) > 50:
|
||||||
|
self.submit_collected()
|
||||||
|
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
self.mysqlite.close()
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
w = Proxywatchd()
|
||||||
|
try:
|
||||||
|
w.run()
|
||||||
|
except KeyboardInterrupt as e:
|
||||||
|
w.stop()
|
||||||
|
|||||||
Reference in New Issue
Block a user