diff --git a/proxywatchd.py b/proxywatchd.py index dbd4ad0..b33db0d 100644 --- a/proxywatchd.py +++ b/proxywatchd.py @@ -2,6 +2,7 @@ import threading import time, random, string, re, copy +import Queue try: import IP2Location import os @@ -88,8 +89,14 @@ def socks4_resolve(srvname, server_port): return srv -class WorkerJob(): - def __init__(self, ip, port, proto, failcount, success_count, total_duration, country, mitm, consecutive_success, oldies = False): +class ProxyTestState(): + """Thread-safe state for a proxy being tested against multiple targets. + + Results from TargetTestJob instances are aggregated here. + When all tests complete, evaluate() determines final pass/fail. + """ + def __init__(self, ip, port, proto, failcount, success_count, total_duration, + country, mitm, consecutive_success, num_targets=3, oldies=False): self.ip = ip self.port = int(port) self.proxy = '%s:%s' % (ip, port) @@ -102,85 +109,29 @@ class WorkerJob(): self.mitm = mitm self.consecutive_success = consecutive_success self.isoldies = oldies + self.num_targets = num_targets - def connect_socket(self, checktype): + # thread-safe result accumulation + self.lock = threading.Lock() + self.results = [] # list of (success, proto, duration, srv, tor, ssl) + self.completed = False - if checktype == 'irc': - srvname = random.choice(config.servers).strip() - use_ssl = random.choice([0,1]) if config.watchd.use_ssl == 2 else config.watchd.use_ssl - if self.consecutive_success > 0 and (self.consecutive_success % 3) == 0: use_ssl = 1 - server_port = 6697 if use_ssl else 6667 - elif checktype == 'http': - srvname = random.choice( [ i for i in regexes.keys() ]) - use_ssl = random.choice([0,1]) if config.watchd.use_ssl == 2 else config.watchd.use_ssl - if self.consecutive_success > 0 and (self.consecutive_success % 3) == 0: use_ssl = 1 - server_port = 443 if use_ssl else 80 + def record_result(self, success, proto=None, duration=0, srv=None, tor=None, ssl=None): + """Record a single target test result. Thread-safe.""" + with self.lock: + self.results.append({ + 'success': success, + 'proto': proto, + 'duration': duration, + 'srv': srv, + 'tor': tor, + 'ssl': ssl + }) - verifycert = True if use_ssl else False - protos = ['http', 'socks5', 'socks4'] if self.proto is None else [self.proto] - - fail_inc = 1 - - for proto in protos: - torhost = random.choice(config.torhosts) - # socks4 (without 4a) requires a raw ip address - # rocksock automatically resolves if needed, but it's more - # efficient to cache the result. - if proto == 'socks4': srv = socks4_resolve(srvname, server_port) - else: srv = srvname - ## skip socks4 failed resolution - if not srv: continue - - duration = time.time() - proxies = [ - rocksock.RocksockProxyFromURL('socks5://%s' % torhost), - rocksock.RocksockProxyFromURL('%s://%s:%s' % (proto, self.ip, self.port)), - ] - - try: - sock = rocksock.Rocksock(host=srv, port=server_port, ssl=use_ssl, proxies=proxies, timeout=config.watchd.timeout, verifycert=verifycert) - sock.connect() - if checktype == 'irc': - sock.send('NICK\n') - elif checktype == 'http': - sock.send('HEAD / HTTP/1.0\r\nHost: %s\r\n\r\n' % srvname) - return sock, proto, duration, torhost, srvname, 0, use_ssl - except rocksock.RocksockException as e: - if config.watchd.debug: - _log("proxy failed: %s://%s:%d: %s"%(proto, self.ip, self.port, e.get_errormessage()), 'debug') - - et = e.get_errortype() - err = e.get_error() - fp = e.get_failedproxy() - - sock.disconnect() - - if et == rocksock.RS_ET_OWN: - if fp == 1 and \ - err == rocksock.RS_E_REMOTE_DISCONNECTED or \ - err == rocksock.RS_E_HIT_TIMEOUT: - # proxy is not online, so don't waste time trying all possible protocols - break - elif fp == 0 and \ - err == rocksock.RS_E_TARGET_CONN_REFUSED: - fail_inc = 0 - if random.randint(0, (config.watchd.threads-1)/2) == 0: - _log("could not connect to proxy 0, sleep 5s", "ERROR") - time.sleep(5) - elif et == rocksock.RS_ET_GAI: - assert(0) - fail_inc = 0 - _log("could not resolve connection target %s"%srvname, "ERROR") - break - - elif err == rocksock.RS_E_SSL_CERTIFICATE_ERROR: - fail_inc = 0 - self.mitm = 1 - - except KeyboardInterrupt as e: - raise(e) - - return None, None, None, None, None, fail_inc, use_ssl + def is_complete(self): + """Check if all target tests have finished.""" + with self.lock: + return len(self.results) >= self.num_targets def rwip(self, ip): n = [] @@ -189,119 +140,207 @@ class WorkerJob(): n.append(b) return '.'.join(n) + def evaluate(self): + """Evaluate results after all tests complete. Returns True if proxy is valid.""" + with self.lock: + if self.completed: + return self.failcount == 0 + self.completed = True + self.checktime = int(time.time()) - def run(self): - self.checktime = int(time.time()) - checktype = config.watchd.checktype + successes = [r for r in self.results if r['success']] + num_success = len(successes) - sock, proto, duration, tor, srv, failinc, is_ssl = self.connect_socket(checktype) - if not sock: - self.failcount += failinc - return - try: - recv = sock.recv(-1) - - regex = '^(:|NOTICE|ERROR)' if checktype == 'irc' else regexes[srv] - # good data - if re.search(regex, recv, re.IGNORECASE): - duration = (time.time() - duration) + # require majority success (2/3) + if num_success >= 2: + # use last successful result for metrics + last_good = successes[-1] if geolite and self.country is None: self.ip = self.rwip(self.ip) rec = geodb.get_all(self.ip) - if rec is not None and rec.country_short: self.country = rec.country_short + if rec is not None and rec.country_short: + self.country = rec.country_short - self.proto = proto + self.proto = last_good['proto'] self.failcount = 0 - if (self.consecutive_success %3) == 0: self.mitm = 0 - self.consecutive_success = self.consecutive_success + 1 - self.success_count = self.success_count + 1 - self.total_duration += int(duration*1000) - torstats = "" if len(config.torhosts)==1 else ' tor: %s;'%tor - recvstats = "".join([x if x in string.printable and ord(x) > 32 else '.' for x in recv]) - _log('%s://%s:%d (%s) d: %.2f sec(s);%s srv: %s; ssl: %s; recv: %s' % (proto, self.ip, self.port, self.country, duration, torstats, srv, str(is_ssl), recvstats[:50]), 'xxxxx') + if (self.consecutive_success % 3) == 0: + self.mitm = 0 + self.consecutive_success += 1 + self.success_count += 1 + self.total_duration += int(last_good['duration'] * 1000) + + torstats = "" if len(config.torhosts) == 1 else ' tor: %s;' % last_good['tor'] + _log('%s://%s:%d (%s) d: %.2f sec(s);%s srv: %s; ssl: %s; %d/%d targets' % ( + last_good['proto'], self.ip, self.port, self.country, + last_good['duration'], torstats, last_good['srv'], str(last_good['ssl']), + num_success, self.num_targets), 'xxxxx') + return True + + elif num_success == 1: + # partial success - don't increment fail, but reset consecutive + self.consecutive_success = 0 + _log('%s:%d partial success %d/%d targets' % ( + self.ip, self.port, num_success, self.num_targets), 'debug') + return False + else: self.failcount += 1 self.consecutive_success = 0 + return False + + +class TargetTestJob(): + """Job to test a single proxy against a single target. + + Multiple TargetTestJob instances share the same ProxyTestState, + allowing tests to be interleaved with other proxies in the queue. + """ + def __init__(self, proxy_state, target_srv, checktype): + self.proxy_state = proxy_state + self.target_srv = target_srv + self.checktype = checktype + + def run(self): + """Test the proxy against this job's target server.""" + sock, proto, duration, tor, srv, failinc, is_ssl = self._connect_and_test() + + if not sock: + self.proxy_state.record_result(False) + return + + try: + recv = sock.recv(-1) + regex = '^(:|NOTICE|ERROR)' if self.checktype == 'irc' else regexes[srv] + + if re.search(regex, recv, re.IGNORECASE): + elapsed = time.time() - duration + self.proxy_state.record_result( + True, proto=proto, duration=elapsed, + srv=srv, tor=tor, ssl=is_ssl + ) + else: + self.proxy_state.record_result(False) + except KeyboardInterrupt as e: + sock.disconnect() raise e - except rocksock.RocksockException as e: - self.failcount += 1 - self.consecutive_success = 0 + except rocksock.RocksockException: + self.proxy_state.record_result(False) finally: sock.disconnect() + def _connect_and_test(self): + """Connect to target through the proxy and send test packet.""" + ps = self.proxy_state + srvname = self.target_srv.strip() if self.checktype == 'irc' else self.target_srv + + use_ssl = random.choice([0, 1]) if config.watchd.use_ssl == 2 else config.watchd.use_ssl + if ps.consecutive_success > 0 and (ps.consecutive_success % 3) == 0: + use_ssl = 1 + + if self.checktype == 'irc': + server_port = 6697 if use_ssl else 6667 + else: + server_port = 443 if use_ssl else 80 + + verifycert = True if use_ssl else False + protos = ['http', 'socks5', 'socks4'] if ps.proto is None else [ps.proto] + + for proto in protos: + torhost = random.choice(config.torhosts) + if proto == 'socks4': + srv = socks4_resolve(srvname, server_port) + else: + srv = srvname + if not srv: + continue + + duration = time.time() + proxies = [ + rocksock.RocksockProxyFromURL('socks5://%s' % torhost), + rocksock.RocksockProxyFromURL('%s://%s:%s' % (proto, ps.ip, ps.port)), + ] + + try: + sock = rocksock.Rocksock(host=srv, port=server_port, ssl=use_ssl, + proxies=proxies, timeout=config.watchd.timeout, + verifycert=verifycert) + sock.connect() + if self.checktype == 'irc': + sock.send('NICK\n') + else: + sock.send('HEAD / HTTP/1.0\r\nHost: %s\r\n\r\n' % srvname) + return sock, proto, duration, torhost, srvname, 0, use_ssl + + except rocksock.RocksockException as e: + if config.watchd.debug: + _log("proxy failed: %s://%s:%d: %s" % (proto, ps.ip, ps.port, + e.get_errormessage()), 'debug') + + et = e.get_errortype() + err = e.get_error() + fp = e.get_failedproxy() + + sock.disconnect() + + if et == rocksock.RS_ET_OWN: + if fp == 1 and (err == rocksock.RS_E_REMOTE_DISCONNECTED or + err == rocksock.RS_E_HIT_TIMEOUT): + break + elif fp == 0 and err == rocksock.RS_E_TARGET_CONN_REFUSED: + if random.randint(0, (config.watchd.threads - 1) / 2) == 0: + _log("could not connect to tor, sleep 5s", "ERROR") + time.sleep(5) + elif et == rocksock.RS_ET_GAI: + _log("could not resolve connection target %s" % srvname, "ERROR") + break + elif err == rocksock.RS_E_SSL_CERTIFICATE_ERROR: + ps.mitm = 1 + + except KeyboardInterrupt as e: + raise e + + return None, None, None, None, None, 1, use_ssl + class WorkerThread(): - def __init__ (self, id): + def __init__(self, id, job_queue, result_queue): self.id = id self.done = threading.Event() self.thread = None - self.workqueue = [] - self.workdone = [] - self.lock = threading.Lock() + self.job_queue = job_queue # shared input queue + self.result_queue = result_queue # shared output queue def stop(self): self.done.set() def term(self): if self.thread: self.thread.join() - def add_jobs(self, jobs): - with self.lock: - self.workqueue.extend(jobs) - def return_jobs(self): - with self.lock: - jobs = self.workqueue - self.workqueue = [] - return jobs - def jobcount(self): - return len(self.workqueue) - def collect(self): - wd = copy.copy(self.workdone) - self.workdone = [] - return wd def start_thread(self): self.thread = threading.Thread(target=self.workloop) self.thread.start() - def pop_if_possible(self): - with self.lock: - if len(self.workqueue): - job = self.workqueue.pop() - else: - job = None - return job def workloop(self): - success_count = 0 job_count = 0 duration_total = 0 - duration_success_total = 0 - while True: - job = self.pop_if_possible() - if job: - nao = time.time() - job.run() - spent = time.time() - nao - if job.failcount == 0: - duration_success_total += spent - success_count += 1 - job_count += 1 - duration_total += spent - self.workdone.append(job) - elif not self.thread: - break - if self.done.is_set(): break - time.sleep( random.random() / 100) - if self.thread: - succ_rate = try_div(success_count, job_count)*100 - avg_succ_t = try_div(duration_success_total, success_count) - avg_fail_t = try_div(duration_total-duration_success_total, job_count-success_count) + while not self.done.is_set(): + try: + job = self.job_queue.get(timeout=0.5) + except Queue.Empty: + continue + nao = time.time() + job.run() + spent = time.time() - nao + job_count += 1 + duration_total += spent + self.result_queue.put(job) + self.job_queue.task_done() + if self.thread and job_count > 0: avg_t = try_div(duration_total, job_count) - _log("terminated, %d/%d (%.2f%%), avg.time S/F/T %.2f, %.2f, %.2f" \ - % (success_count, job_count, succ_rate, avg_succ_t, avg_fail_t, avg_t) \ - , self.id) + _log("terminated, %d jobs, avg.time %.2f" % (job_count, avg_t), self.id) class Proxywatchd(): def stop(self): - _log('halting... (%d thread(s))' % len([item for item in self.threads if True]), 'watchd') + _log('halting... (%d thread(s))' % len(self.threads), 'watchd') self.stopping.set() def _cleanup(self): @@ -332,6 +371,14 @@ class Proxywatchd(): self.stopping = threading.Event() self.stopped = threading.Event() + # shared work-stealing queues + self.job_queue = Queue.Queue() + self.result_queue = Queue.Queue() + + # track pending proxy states (for multi-target aggregation) + self.pending_states = [] # list of ProxyTestState awaiting completion + self.pending_lock = threading.Lock() + # create table if needed self._prep_db() self.mysqlite.execute('CREATE TABLE IF NOT EXISTS proxylist (proxy BLOB, country BLOB, added INT, failed INT, tested INT, source BLOB, dronebl INT, proto TEXT, success_count INT, total_duration INT, ip TEXT, port INT)') @@ -339,8 +386,7 @@ class Proxywatchd(): self._close_db() self.submit_after = config.watchd.submit_after # number of collected jobs before writing db - self.jobs = [] - self.collected = [] + self.collected = [] # completed ProxyTestState objects ready for DB self.totals = { 'submitted':0, 'success':0, @@ -365,22 +411,88 @@ class Proxywatchd(): ## enable tor safeguard by default self.tor_safeguard = config.watchd.tor_safeguard rows = self.fetch_rows() - #print('preparing jobbs, oldies: %s' % str(self.isoldies)) + checktype = config.watchd.checktype + num_targets = 3 + + # select target pool based on checktype + if checktype == 'irc': + target_pool = config.servers + else: + target_pool = list(regexes.keys()) + + # create all jobs first, then shuffle for interleaving + all_jobs = [] + new_states = [] + for row in rows: - job = WorkerJob(row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], self.isoldies) - self.jobs.append(job) + # create shared state for this proxy + state = ProxyTestState( + row[0], row[1], row[2], row[3], row[4], row[5], + row[6], row[7], row[8], num_targets=num_targets, + oldies=self.isoldies + ) + new_states.append(state) + + # select random targets for this proxy + targets = random.sample(target_pool, min(num_targets, len(target_pool))) + + # create one job per target + for target in targets: + job = TargetTestJob(state, target, checktype) + all_jobs.append(job) + + # shuffle to interleave tests across different proxies + random.shuffle(all_jobs) + + # track pending states + with self.pending_lock: + self.pending_states.extend(new_states) + + # queue all jobs + for job in all_jobs: + self.job_queue.put(job) + self._close_db() + proxy_count = len(new_states) + job_count = len(all_jobs) + if proxy_count > 0: + _log("created %d jobs for %d proxies (%d targets each)" % ( + job_count, proxy_count, num_targets), 'watchd') + return job_count def collect_work(self): - for wt in self.threads: - self.collected.extend(wt.collect()) + # drain results from shared result queue (TargetTestJob objects) + # results are already recorded in their ProxyTestState + while True: + try: + self.result_queue.get_nowait() + except Queue.Empty: + break + + # check for completed proxy states and evaluate them + with self.pending_lock: + still_pending = [] + for state in self.pending_states: + if state.is_complete(): + state.evaluate() + self.collected.append(state) + else: + still_pending.append(state) + self.pending_states = still_pending def collect_unfinished(self): - for wt in self.threads: - jobs = wt.return_jobs() - self.jobs.extend(jobs) - if len(self.jobs): - _log("collected %d unfinished jobs"%len(self.jobs), "watchd") + # drain any remaining jobs from job queue + unfinished_count = 0 + while True: + try: + self.job_queue.get_nowait() + unfinished_count += 1 + except Queue.Empty: + break + if unfinished_count > 0: + _log("discarded %d unfinished jobs" % unfinished_count, "watchd") + # note: corresponding ProxyTestStates will be incomplete + # they'll be re-tested in the next cycle def submit_collected(self): if len(self.collected) == 0: return True @@ -430,66 +542,50 @@ class Proxywatchd(): def _run(self): _log('starting...', 'watchd') + # create worker threads with shared queues for i in range(config.watchd.threads): - threadid = ''.join( [ random.choice(string.letters) for x in range(5) ] ) - wt = WorkerThread(threadid) + threadid = ''.join([random.choice(string.letters) for x in range(5)]) + wt = WorkerThread(threadid, self.job_queue, self.result_queue) if self.in_background: wt.start_thread() self.threads.append(wt) - #time.sleep( (random.random()/100) ) - time.sleep( (random.random()/10) ) + time.sleep(random.random() / 10) sleeptime = 0 while True: if self.stopping.is_set(): - print('stopping is_set') if self.in_background: self._cleanup() break - if sleeptime == 0: - sleeptime = 1 - else: + if sleeptime > 0: time.sleep(1) sleeptime -= 1 continue - if self.threads[random.choice(xrange(len(self.threads)))].jobcount() == 0: - self.collect_unfinished() - if not len(self.jobs): - self.collect_work() - if not self.submit_collected() and self.tor_safeguard: - _log("zzZzZzzZ sleeping 1 minute(s) due to tor issues - consider decreasing thread number!", "watchd") - self.collect_unfinished() - sleeptime = 1*60 - else: - self.prepare_jobs() + # check if job queue is empty (work-stealing: threads pull as needed) + if self.job_queue.empty(): + self.collect_work() + if not self.submit_collected() and self.tor_safeguard: + _log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd") + sleeptime = 60 else: - if len(self.jobs) < len(self.threads): - # allow threads enough time to consume the jobs + job_count = self.prepare_jobs() + if job_count == 0: + # no jobs available, wait before checking again sleeptime = 10 - #if len(self.jobs) >= len(self.threads): - if len(self.jobs): - _log("handing out %d jobs to %d thread(s)"% (len(self.jobs), len(self.threads)), 'watchd') - jpt = len(self.jobs)/len(self.threads) - if len(self.jobs)/float(len(self.threads)) - jpt > 0.0: jpt += 1 - for tid in xrange(len(self.threads)): - self.threads[tid].add_jobs(self.jobs[tid*jpt:tid*jpt+jpt]) - self.jobs = [] - if not self.in_background: # single_thread scenario + if not self.in_background: # single_thread scenario self.threads[0].workloop() self.collect_work() if len(self.collected) > self.submit_after: if not self.submit_collected() and self.tor_safeguard: - _log("zzZzZzzZ sleeping 1 minute(s) due to tor issues - consider decreasing thread number!", "watchd") - self.collect_unfinished() - sleeptime = 1*60 + _log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd") + sleeptime = 60 time.sleep(1) - sleeptime -= 1 if __name__ == '__main__':