#!/usr/bin/env python import threading import time, random, string, re, copy import Queue try: import IP2Location import os geodb = IP2Location.IP2Location(os.path.join("data", "IP2LOCATION-LITE-DB1.BIN")) geolite = True except: geolite = False from config import Config import mysqlite from misc import _log import rocksock config = Config() _run_standalone = False cached_dns = dict() regexes = { 'www.facebook.com': 'X-FB-Debug', 'www.fbcdn.net': 'X-FB-Debug', 'www.reddit.com': 'x-clacks-overhead', 'www.twitter.com': 'x-connection-hash', 't.co': 'x-connection-hash', 'www.msn.com': 'x-aspnetmvc-version', 'www.bing.com': 'p3p', 'www.ask.com': 'x-served-by', 'www.hotmail.com': 'x-msedge-ref', 'www.bbc.co.uk': 'x-bbc-edge-cache-status', 'www.skype.com': 'X-XSS-Protection', 'www.alibaba.com': 'object-status', 'www.mozilla.org': 'cf-ray', 'www.cloudflare.com': 'cf-ray', 'www.wikimedia.org': 'x-client-ip', 'www.vk.com': 'x-frontend', 'www.tinypic.com': 'x-amz-cf-pop', 'www.netflix.com': 'X-Netflix.proxy.execution-time', 'www.amazon.de': 'x-amz-cf-id', 'www.reuters.com': 'x-amz-cf-id', 'www.ikea.com': 'x-frame-options', 'www.twitpic.com': 'timing-allow-origin', 'www.digg.com': 'cf-request-id', 'www.wikia.com': 'x-served-by', 'www.wp.com': 'x-ac', 'www.last.fm': 'x-timer', 'www.usps.com': 'x-ruleset-version', 'www.linkedin.com': 'x-li-uuid', 'www.vimeo.com': 'x-timer', 'www.yelp.com': 'x-timer', 'www.ebay.com': 'x-envoy-upstream-service-time', 'www.wikihow.com': 'x-c', 'www.archive.org': 'referrer-policy', 'www.pandora.tv': 'X-UA-Compatible', 'www.w3.org': 'x-backend', 'www.time.com': 'x-amz-cf-pop' } def try_div(a, b): if b != 0: return a/float(b) return 0 def socks4_resolve(srvname, server_port): srv = srvname if srv in cached_dns: srv = cached_dns[srvname] if config.watchd.debug: _log("using cached ip (%s) for %s"%(srv, srvname), "debug") else: dns_fail = False try: af, sa = rocksock.resolve(rocksock.RocksockHostinfo(srvname, server_port), want_v4=True) if sa is not None: cached_dns[srvname] = sa[0] srv = sa[0] else: dns_fail = True except rocksock.RocksockException as e: assert(e.get_errortype() == rocksock.RS_ET_GAI) dns_fail = True if dns_fail: fail_inc = 0 _log("could not resolve connection target %s"%srvname, "ERROR") return False return srv class ProxyTestState(): """Thread-safe state for a proxy being tested against multiple targets. Results from TargetTestJob instances are aggregated here. When all tests complete, evaluate() determines final pass/fail. """ def __init__(self, ip, port, proto, failcount, success_count, total_duration, country, mitm, consecutive_success, num_targets=3, oldies=False): self.ip = ip self.port = int(port) self.proxy = '%s:%s' % (ip, port) self.proto = proto self.failcount = failcount self.checktime = None self.success_count = success_count self.total_duration = total_duration self.country = country self.mitm = mitm self.consecutive_success = consecutive_success self.isoldies = oldies self.num_targets = num_targets # thread-safe result accumulation self.lock = threading.Lock() self.results = [] # list of (success, proto, duration, srv, tor, ssl) self.completed = False def record_result(self, success, proto=None, duration=0, srv=None, tor=None, ssl=None): """Record a single target test result. Thread-safe.""" with self.lock: self.results.append({ 'success': success, 'proto': proto, 'duration': duration, 'srv': srv, 'tor': tor, 'ssl': ssl }) def is_complete(self): """Check if all target tests have finished.""" with self.lock: return len(self.results) >= self.num_targets def rwip(self, ip): n = [] for b in ip.split('.'): while b[0] == 0 and len(b) > 1: b = b[:1] n.append(b) return '.'.join(n) def evaluate(self): """Evaluate results after all tests complete. Returns True if proxy is valid.""" with self.lock: if self.completed: return self.failcount == 0 self.completed = True self.checktime = int(time.time()) successes = [r for r in self.results if r['success']] num_success = len(successes) # require majority success (2/3) if num_success >= 2: # use last successful result for metrics last_good = successes[-1] if geolite and self.country is None: self.ip = self.rwip(self.ip) rec = geodb.get_all(self.ip) if rec is not None and rec.country_short: self.country = rec.country_short self.proto = last_good['proto'] self.failcount = 0 if (self.consecutive_success % 3) == 0: self.mitm = 0 self.consecutive_success += 1 self.success_count += 1 self.total_duration += int(last_good['duration'] * 1000) torstats = "" if len(config.torhosts) == 1 else ' tor: %s;' % last_good['tor'] _log('%s://%s:%d (%s) d: %.2f sec(s);%s srv: %s; ssl: %s; %d/%d targets' % ( last_good['proto'], self.ip, self.port, self.country, last_good['duration'], torstats, last_good['srv'], str(last_good['ssl']), num_success, self.num_targets), 'xxxxx') return True elif num_success == 1: # partial success - don't increment fail, but reset consecutive self.consecutive_success = 0 _log('%s:%d partial success %d/%d targets' % ( self.ip, self.port, num_success, self.num_targets), 'debug') return False else: self.failcount += 1 self.consecutive_success = 0 return False class TargetTestJob(): """Job to test a single proxy against a single target. Multiple TargetTestJob instances share the same ProxyTestState, allowing tests to be interleaved with other proxies in the queue. """ def __init__(self, proxy_state, target_srv, checktype): self.proxy_state = proxy_state self.target_srv = target_srv self.checktype = checktype def run(self): """Test the proxy against this job's target server.""" sock, proto, duration, tor, srv, failinc, is_ssl = self._connect_and_test() if not sock: self.proxy_state.record_result(False) return try: recv = sock.recv(-1) regex = '^(:|NOTICE|ERROR)' if self.checktype == 'irc' else regexes[srv] if re.search(regex, recv, re.IGNORECASE): elapsed = time.time() - duration self.proxy_state.record_result( True, proto=proto, duration=elapsed, srv=srv, tor=tor, ssl=is_ssl ) else: self.proxy_state.record_result(False) except KeyboardInterrupt as e: sock.disconnect() raise e except rocksock.RocksockException: self.proxy_state.record_result(False) finally: sock.disconnect() def _connect_and_test(self): """Connect to target through the proxy and send test packet.""" ps = self.proxy_state srvname = self.target_srv.strip() if self.checktype == 'irc' else self.target_srv use_ssl = random.choice([0, 1]) if config.watchd.use_ssl == 2 else config.watchd.use_ssl if ps.consecutive_success > 0 and (ps.consecutive_success % 3) == 0: use_ssl = 1 if self.checktype == 'irc': server_port = 6697 if use_ssl else 6667 else: server_port = 443 if use_ssl else 80 verifycert = True if use_ssl else False protos = ['http', 'socks5', 'socks4'] if ps.proto is None else [ps.proto] for proto in protos: torhost = random.choice(config.torhosts) if proto == 'socks4': srv = socks4_resolve(srvname, server_port) else: srv = srvname if not srv: continue duration = time.time() proxies = [ rocksock.RocksockProxyFromURL('socks5://%s' % torhost), rocksock.RocksockProxyFromURL('%s://%s:%s' % (proto, ps.ip, ps.port)), ] try: sock = rocksock.Rocksock(host=srv, port=server_port, ssl=use_ssl, proxies=proxies, timeout=config.watchd.timeout, verifycert=verifycert) sock.connect() if self.checktype == 'irc': sock.send('NICK\n') else: sock.send('HEAD / HTTP/1.0\r\nHost: %s\r\n\r\n' % srvname) return sock, proto, duration, torhost, srvname, 0, use_ssl except rocksock.RocksockException as e: if config.watchd.debug: _log("proxy failed: %s://%s:%d: %s" % (proto, ps.ip, ps.port, e.get_errormessage()), 'debug') et = e.get_errortype() err = e.get_error() fp = e.get_failedproxy() sock.disconnect() if et == rocksock.RS_ET_OWN: if fp == 1 and (err == rocksock.RS_E_REMOTE_DISCONNECTED or err == rocksock.RS_E_HIT_TIMEOUT): break elif fp == 0 and err == rocksock.RS_E_TARGET_CONN_REFUSED: if random.randint(0, (config.watchd.threads - 1) / 2) == 0: _log("could not connect to tor, sleep 5s", "ERROR") time.sleep(5) elif et == rocksock.RS_ET_GAI: _log("could not resolve connection target %s" % srvname, "ERROR") break elif err == rocksock.RS_E_SSL_CERTIFICATE_ERROR: ps.mitm = 1 except KeyboardInterrupt as e: raise e return None, None, None, None, None, 1, use_ssl class WorkerThread(): def __init__(self, id, job_queue, result_queue): self.id = id self.done = threading.Event() self.thread = None self.job_queue = job_queue # shared input queue self.result_queue = result_queue # shared output queue def stop(self): self.done.set() def term(self): if self.thread: self.thread.join() def start_thread(self): self.thread = threading.Thread(target=self.workloop) self.thread.start() def workloop(self): job_count = 0 duration_total = 0 while not self.done.is_set(): try: job = self.job_queue.get(timeout=0.5) except Queue.Empty: continue nao = time.time() job.run() spent = time.time() - nao job_count += 1 duration_total += spent self.result_queue.put(job) self.job_queue.task_done() if self.thread and job_count > 0: avg_t = try_div(duration_total, job_count) _log("terminated, %d jobs, avg.time %.2f" % (job_count, avg_t), self.id) class Proxywatchd(): def stop(self): _log('halting... (%d thread(s))' % len(self.threads), 'watchd') self.stopping.set() def _cleanup(self): for wt in self.threads: wt.stop() for wt in self.threads: wt.term() self.collect_work() self.submit_collected() self.stopped.set() def finish(self): if not self.in_background: self._cleanup() while not self.stopped.is_set(): time.sleep(0.1) success_rate = try_div(self.totals['success'], self.totals['submitted']) * 100 _log("total results: %d/%d (%.2f%%)"%(self.totals['success'], self.totals['submitted'], success_rate), "watchd") def _prep_db(self): self.mysqlite = mysqlite.mysqlite(config.watchd.database, str) def _close_db(self): if self.mysqlite: self.mysqlite.close() self.mysqlite = None def __init__(self): config.load() self.in_background = False self.threads = [] self.stopping = threading.Event() self.stopped = threading.Event() # shared work-stealing queues self.job_queue = Queue.Queue() self.result_queue = Queue.Queue() # track pending proxy states (for multi-target aggregation) self.pending_states = [] # list of ProxyTestState awaiting completion self.pending_lock = threading.Lock() # create table if needed self._prep_db() self.mysqlite.execute('CREATE TABLE IF NOT EXISTS proxylist (proxy BLOB, country BLOB, added INT, failed INT, tested INT, source BLOB, dronebl INT, proto TEXT, success_count INT, total_duration INT, ip TEXT, port INT)') self.mysqlite.commit() self._close_db() self.submit_after = config.watchd.submit_after # number of collected jobs before writing db self.collected = [] # completed ProxyTestState objects ready for DB self.totals = { 'submitted':0, 'success':0, } def fetch_rows(self): self.isoldies = False q = 'SELECT ip,port,proto,failed,success_count,total_duration,country,mitm,consecutive_success FROM proxylist WHERE failed >= ? and failed < ? and (tested + ? + (failed * ?)) < ? ORDER BY RANDOM()' rows = self.mysqlite.execute(q, (0, config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, time.time())).fetchall() # check oldies ? if len(rows) < config.watchd.threads: rows = [] if config.watchd.oldies: self.isoldies = True ## disable tor safeguard for old proxies if self.tor_safeguard: self.tor_safeguard = False rows = self.mysqlite.execute(q, (config.watchd.max_fail, config.watchd.max_fail + round( config.watchd.max_fail/2), config.watchd.checktime, config.watchd.oldies_checktime, time.time())).fetchall() return rows def prepare_jobs(self): self._prep_db() ## enable tor safeguard by default self.tor_safeguard = config.watchd.tor_safeguard rows = self.fetch_rows() checktype = config.watchd.checktype num_targets = 3 # select target pool based on checktype if checktype == 'irc': target_pool = config.servers else: target_pool = list(regexes.keys()) # create all jobs first, then shuffle for interleaving all_jobs = [] new_states = [] for row in rows: # create shared state for this proxy state = ProxyTestState( row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], num_targets=num_targets, oldies=self.isoldies ) new_states.append(state) # select random targets for this proxy targets = random.sample(target_pool, min(num_targets, len(target_pool))) # create one job per target for target in targets: job = TargetTestJob(state, target, checktype) all_jobs.append(job) # shuffle to interleave tests across different proxies random.shuffle(all_jobs) # track pending states with self.pending_lock: self.pending_states.extend(new_states) # queue all jobs for job in all_jobs: self.job_queue.put(job) self._close_db() proxy_count = len(new_states) job_count = len(all_jobs) if proxy_count > 0: _log("created %d jobs for %d proxies (%d targets each)" % ( job_count, proxy_count, num_targets), 'watchd') return job_count def collect_work(self): # drain results from shared result queue (TargetTestJob objects) # results are already recorded in their ProxyTestState while True: try: self.result_queue.get_nowait() except Queue.Empty: break # check for completed proxy states and evaluate them with self.pending_lock: still_pending = [] for state in self.pending_states: if state.is_complete(): state.evaluate() self.collected.append(state) else: still_pending.append(state) self.pending_states = still_pending def collect_unfinished(self): # drain any remaining jobs from job queue unfinished_count = 0 while True: try: self.job_queue.get_nowait() unfinished_count += 1 except Queue.Empty: break if unfinished_count > 0: _log("discarded %d unfinished jobs" % unfinished_count, "watchd") # note: corresponding ProxyTestStates will be incomplete # they'll be re-tested in the next cycle def submit_collected(self): if len(self.collected) == 0: return True sc = 0 args = [] for job in self.collected: if job.failcount == 0: sc += 1 args.append( (job.failcount, job.checktime, 1, job.country, job.proto, job.success_count, job.total_duration, job.mitm, job.consecutive_success, job.proxy) ) success_rate = (float(sc) / len(self.collected)) * 100 ret = True if len(self.collected) >= 100 and success_rate <= config.watchd.outage_threshold and self.tor_safeguard: _log("WATCHD %.2f%% SUCCESS RATE - tor circuit blocked? won't submit fails"%success_rate, "ERROR") if sc == 0: return False args = [] for job in self.collected: if job.failcount == 0: args.append( (job.failcount, job.checktime, 1, job.country, job.proto, job.success_count, job.total_duration, job.mitm, job.consecutive_success, job.proxy) ) ret = False _log("updating %d DB entries (success rate: %.2f%%)"%(len(self.collected), success_rate), 'watchd') self._prep_db() query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=? WHERE proxy=?' self.mysqlite.executemany(query, args) self.mysqlite.commit() self._close_db() self.collected = [] self.totals['submitted'] += len(args) self.totals['success'] += sc return ret def start(self): if config.watchd.threads == 1 and _run_standalone: return self._run() else: return self._run_background() def run(self): if self.in_background: while 1: time.sleep(1) def _run_background(self): self.in_background = True t = threading.Thread(target=self._run) t.start() def _run(self): _log('starting...', 'watchd') # create worker threads with shared queues for i in range(config.watchd.threads): threadid = ''.join([random.choice(string.letters) for x in range(5)]) wt = WorkerThread(threadid, self.job_queue, self.result_queue) if self.in_background: wt.start_thread() self.threads.append(wt) time.sleep(random.random() / 10) sleeptime = 0 while True: if self.stopping.is_set(): if self.in_background: self._cleanup() break if sleeptime > 0: time.sleep(1) sleeptime -= 1 continue # check if job queue is empty (work-stealing: threads pull as needed) if self.job_queue.empty(): self.collect_work() if not self.submit_collected() and self.tor_safeguard: _log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd") sleeptime = 60 else: job_count = self.prepare_jobs() if job_count == 0: # no jobs available, wait before checking again sleeptime = 10 if not self.in_background: # single_thread scenario self.threads[0].workloop() self.collect_work() if len(self.collected) > self.submit_after: if not self.submit_collected() and self.tor_safeguard: _log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd") sleeptime = 60 time.sleep(1) if __name__ == '__main__': _run_standalone = True config.load() w = Proxywatchd() try: w.start() w.run() except KeyboardInterrupt as e: pass finally: w.stop() w.finish()