Files
ppf/proxywatchd.py
2019-01-05 23:39:36 +00:00

257 lines
7.0 KiB
Python

#!/usr/bin/env python
import threading
import time, random, string, re, copy
import requests
#from geoip import geolite2
import config
import mysqlite
from misc import _log
import rocksock
_run_standalone = False
class WorkerJob():
def __init__(self, proxy, proto, failcount, success_count, total_duration):
self.proxy = proxy
self.proto = proto
self.failcount = failcount
self.nextcheck = None
self.duration = None
self.success_count = success_count
self.total_duration = total_duration
def is_drone_bl(self, proxy):
p = proxy.split(':')[0]
proxies = {'http':'socks4://%s:%s@%s' % (p,p,random.choice(config.torhosts))}
resp = requests.get('http://dronebl.org/lookup?ip=%s' % p, proxies=proxies)
if 'No incidents regarding' in resp.text: return 0
else: return 1
def connect_socket(self):
protos = ['http', 'socks5', 'socks4'] if self.proto is None else self.proto
for proto in protos:
torhost = random.choice(config.torhosts)
duration = time.time()
proxies = [
rocksock.RocksockProxyFromURL('socks4://%s' % torhost),
rocksock.RocksockProxyFromURL('%s://%s' % (proto, self.proxy)),
]
srv = random.choice(config.servers).strip()
try:
sock = rocksock.Rocksock(host=srv, port=6697, ssl=True, proxies=proxies, timeout=config.timeout)
sock.connect()
sock.send('%s\n' % random.choice(['NICK', 'USER', 'JOIN', 'MODE', 'PART', 'INVITE', 'KNOCK', 'WHOIS', 'WHO', 'NOTICE', 'PRIVMSG', 'PING', 'QUIT']))
return sock, proto, duration, torhost, srv
except KeyboardInterrupt as e:
raise(e)
except: sock.disconnect()
return None, None, None, None, None
def run(self):
self.nextcheck = (time.time() + 1800 + ((1+int(self.failcount)) * 3600))
sock, proto, duration, tor, srv = self.connect_socket()
if not sock:
self.failcount += 1
return
try:
recv = sock.recv(6)
# good data
if re.match('^(:|ERROR|PING|PONG|NOTICE|\*\*\*)', recv, re.IGNORECASE):
duration = (time.time() - duration)
self.nextcheck = (time.time() + 1800)
#match = geolite2.lookup(proxy[0].split(':')[0])
match = None
if match is not None: match = match.country
else: match = 'unknown'
#dronebl = self.is_drone_bl(proxy[0])
self.proto = proto
self.duation = duration
self.failcount = 0
self.success_count = self.success_count + 1
self.total_duration += duration*1000
_log('%s://%s; c: %s; d: %d sec(s); tor: %s; srv: %s; recv: %s' % (proto, self.proxy, match, duration, tor, srv, recv), 'xxxxx')
except KeyboardInterrupt as e:
raise e
except:
self.failcount += 1
finally:
sock.disconnect()
class WorkerThread():
def __init__ (self, id):
self.id = id
self.done = threading.Event()
self.thread = None
self.workqueue = []
self.workdone = []
def stop(self):
self.done.set()
def term(self):
if self.thread: self.thread.join()
def add_jobs(self, jobs):
self.workqueue.extend(jobs)
def jobcount(self):
return len(self.workqueue)
def collect(self):
wd = copy.copy(self.workdone)
self.workdone = []
return wd
def start_thread(self):
self.thread = threading.Thread(target=self.workloop)
self.thread.start()
def workloop(self):
while True:
if len(self.workqueue):
job = self.workqueue.pop()
job.run()
self.workdone.append(job)
elif not self.thread:
break
if self.done.is_set(): break
time.sleep(0.01)
if self.thread:
_log("thread terminated", self.id)
class Proxywatchd():
def stop(self):
_log('Requesting proxywatchd to halt (%d thread(s))' % len([item for item in self.threads if True]))
self.stopping.set()
def _cleanup(self):
for wt in self.threads:
wt.stop()
for wt in self.threads:
wt.term()
self.collect_work()
self.submit_collected()
self.stopped.set()
def finish(self):
if not self.in_background: self._cleanup()
while not self.stopped.is_set(): time.sleep(0.1)
def _prep_db(self):
self.mysqlite = mysqlite.mysqlite(config.database, str)
def _close_db(self):
if self.mysqlite:
self.mysqlite.close()
self.mysqlite = None
def __init__(self):
config.load()
self.in_background = False
self.threads = []
self.stopping = threading.Event()
self.stopped = threading.Event()
# create table if needed
self._prep_db()
self.mysqlite.execute('CREATE TABLE IF NOT EXISTS proxylist (proxy BLOB, country BLOB, added INT, failed INT, tested INT, source BLOB, dronebl INT, proto TEXT, duration INT, success_count INT, total_duration INT)')
self.mysqlite.commit()
self._close_db()
self.submit_after = 200 # number of collected jobs before writing db
self.jobs = []
self.collected = []
def prepare_jobs(self):
self._prep_db()
q = 'SELECT proxy,proto,failed,success_count,total_duration FROM proxylist WHERE failed<? and tested<? ORDER BY RANDOM()' # ' LIMIT ?'
rows = self.mysqlite.execute(q, (config.maxfail, time.time())).fetchall()
for row in rows:
job = WorkerJob(row[0], row[1], row[2], row[3], row[4])
self.jobs.append(job)
self._close_db()
def collect_work(self):
for wt in self.threads:
self.collected.extend(wt.collect())
def submit_collected(self):
_log("watchd main thread: updating %d entries"%len(self.collected))
self._prep_db()
query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,duration=?,success_count=?,total_duration=? WHERE proxy=?'
args = [ (job.failcount, job.nextcheck, 1, 'unknown', job.proto, job.success_count, job.total_duration, job.duration, job.proxy) for job in self.collected ]
self.mysqlite.executemany(query, args)
self.mysqlite.commit()
self._close_db()
self.collected = []
def start(self):
if config.watchd_threads == 1 and _run_standalone:
return self._run()
else:
return self._run_background()
def run(self):
if self.in_background:
while 1: time.sleep(0.1)
def _run_background(self):
self.in_background = True
t = threading.Thread(target=self._run)
t.start()
def _run(self):
_log('Starting proxywatchd..', 'notice')
for i in range(config.watchd_threads):
threadid = ''.join( [ random.choice(string.letters) for x in range(5) ] )
wt = WorkerThread(threadid)
if self.in_background:
wt.start_thread()
self.threads.append(wt)
while True:
if self.stopping.is_set():
if self.in_background: self._cleanup()
break
if self.threads[random.choice(xrange(len(self.threads)))].jobcount() == 0:
self.prepare_jobs()
if len(self.jobs):
_log("watchd main: handing out %d jobs"%len(self.jobs))
jpt = len(self.jobs)/config.watchd_threads
if len(self.jobs)/float(config.watchd_threads) - jpt > 0.0: jpt += 1
for tid in range(config.watchd_threads):
self.threads[tid].add_jobs(self.jobs[tid*jpt:tid*jpt+jpt])
self.jobs = []
if not self.in_background: # single_thread scenario
self.threads[0].workloop()
self.collect_work()
if len(self.collected) > self.submit_after:
self.submit_collected()
time.sleep(1)
if __name__ == '__main__':
_run_standalone = True
config.load()
w = Proxywatchd()
try:
w.start()
w.run()
except KeyboardInterrupt as e:
raise e
finally:
w.stop()
w.finish()