diff --git a/proxywatchd.py b/proxywatchd.py index b5cd105..5a7ed08 100644 --- a/proxywatchd.py +++ b/proxywatchd.py @@ -8,13 +8,13 @@ try: import os geodb = IP2Location.IP2Location(os.path.join("data", "IP2LOCATION-LITE-DB1.BIN")) geolite = True -except: +except (ImportError, IOError): geolite = False from config import Config import mysqlite -from misc import _log +from misc import _log, categorize_error import rocksock config = Config() @@ -61,6 +61,48 @@ regexes = { 'www.time.com': 'x-amz-cf-pop' } + +class Stats(): + """Track and report runtime statistics.""" + + def __init__(self): + self.lock = threading.Lock() + self.tested = 0 + self.passed = 0 + self.failed = 0 + self.start_time = time.time() + self.last_report = time.time() + # Failure category tracking + self.fail_categories = {} + + def record(self, success, category=None): + with self.lock: + self.tested += 1 + if success: + self.passed += 1 + else: + self.failed += 1 + if category: + self.fail_categories[category] = self.fail_categories.get(category, 0) + 1 + + def should_report(self, interval): + return (time.time() - self.last_report) >= interval + + def report(self): + with self.lock: + self.last_report = time.time() + elapsed = time.time() - self.start_time + rate = try_div(self.tested, elapsed) + pct = try_div(self.passed * 100.0, self.tested) + base = 'tested=%d passed=%d (%.1f%%) rate=%.2f/s uptime=%dm' % ( + self.tested, self.passed, pct, rate, int(elapsed / 60)) + # Add failure breakdown if there are failures + if self.fail_categories: + cats = ' '.join('%s=%d' % (k, v) for k, v in sorted(self.fail_categories.items())) + return '%s [%s]' % (base, cats) + return base + + def try_div(a, b): if b != 0: return a/float(b) return 0 @@ -116,7 +158,7 @@ class ProxyTestState(): self.results = [] # list of (success, proto, duration, srv, tor, ssl) self.completed = False - def record_result(self, success, proto=None, duration=0, srv=None, tor=None, ssl=None): + def record_result(self, success, proto=None, duration=0, srv=None, tor=None, ssl=None, category=None): """Record a single target test result. Thread-safe.""" with self.lock: self.results.append({ @@ -125,7 +167,8 @@ class ProxyTestState(): 'duration': duration, 'srv': srv, 'tor': tor, - 'ssl': ssl + 'ssl': ssl, + 'category': category }) def is_complete(self): @@ -141,16 +184,32 @@ class ProxyTestState(): return '.'.join(n) def evaluate(self): - """Evaluate results after all tests complete. Returns True if proxy is valid.""" + """Evaluate results after all tests complete. + + Returns: + (success, category) tuple where success is bool and category is + the dominant failure type (or None on success) + """ with self.lock: if self.completed: - return self.failcount == 0 + return (self.failcount == 0, None) self.completed = True self.checktime = int(time.time()) successes = [r for r in self.results if r['success']] + failures = [r for r in self.results if not r['success']] num_success = len(successes) + # Determine dominant failure category + fail_category = None + if failures: + cats = {} + for f in failures: + cat = f.get('category') or 'other' + cats[cat] = cats.get(cat, 0) + 1 + if cats: + fail_category = max(cats.keys(), key=lambda k: cats[k]) + # require majority success (2/3) if num_success >= 2: # use last successful result for metrics @@ -175,19 +234,19 @@ class ProxyTestState(): last_good['proto'], self.ip, self.port, self.country, last_good['duration'], torstats, last_good['srv'], str(last_good['ssl']), num_success, self.num_targets), 'xxxxx') - return True + return (True, None) elif num_success == 1: # partial success - don't increment fail, but reset consecutive self.consecutive_success = 0 _log('%s:%d partial success %d/%d targets' % ( self.ip, self.port, num_success, self.num_targets), 'debug') - return False + return (False, fail_category) else: self.failcount += 1 self.consecutive_success = 0 - return False + return (False, fail_category) class TargetTestJob(): @@ -203,10 +262,10 @@ class TargetTestJob(): def run(self): """Test the proxy against this job's target server.""" - sock, proto, duration, tor, srv, failinc, is_ssl = self._connect_and_test() + sock, proto, duration, tor, srv, failinc, is_ssl, err_cat = self._connect_and_test() if not sock: - self.proxy_state.record_result(False) + self.proxy_state.record_result(False, category=err_cat) return try: @@ -220,13 +279,13 @@ class TargetTestJob(): srv=srv, tor=tor, ssl=is_ssl ) else: - self.proxy_state.record_result(False) + self.proxy_state.record_result(False, category='other') except KeyboardInterrupt as e: sock.disconnect() raise e - except rocksock.RocksockException: - self.proxy_state.record_result(False) + except rocksock.RocksockException as e: + self.proxy_state.record_result(False, category=categorize_error(e)) finally: sock.disconnect() @@ -246,6 +305,7 @@ class TargetTestJob(): verifycert = True if use_ssl else False protos = ['http', 'socks5', 'socks4'] if ps.proto is None else [ps.proto] + last_error_category = None for proto in protos: torhost = random.choice(config.torhosts) @@ -271,12 +331,13 @@ class TargetTestJob(): sock.send('NICK\n') else: sock.send('HEAD / HTTP/1.0\r\nHost: %s\r\n\r\n' % srvname) - return sock, proto, duration, torhost, srvname, 0, use_ssl + return sock, proto, duration, torhost, srvname, 0, use_ssl, None except rocksock.RocksockException as e: + last_error_category = categorize_error(e) if config.watchd.debug: - _log("proxy failed: %s://%s:%d: %s" % (proto, ps.ip, ps.port, - e.get_errormessage()), 'debug') + _log("proxy failed: %s://%s:%d: %s [%s]" % (proto, ps.ip, ps.port, + e.get_errormessage(), last_error_category), 'debug') et = e.get_errortype() err = e.get_error() @@ -301,7 +362,7 @@ class TargetTestJob(): except KeyboardInterrupt as e: raise e - return None, None, None, None, None, 1, use_ssl + return None, None, None, None, None, 1, use_ssl, last_error_category class WorkerThread(): @@ -350,6 +411,8 @@ class Proxywatchd(): wt.term() self.collect_work() self.submit_collected() + if self.httpd_server: + self.httpd_server.stop() self.stopped.set() def finish(self): @@ -391,6 +454,9 @@ class Proxywatchd(): 'submitted':0, 'success':0, } + self.stats = Stats() + self.last_cleanup = time.time() + self.httpd_server = None def fetch_rows(self): self.isoldies = False @@ -474,7 +540,8 @@ class Proxywatchd(): still_pending = [] for state in self.pending_states: if state.is_complete(): - state.evaluate() + success, category = state.evaluate() + self.stats.record(success, category) self.collected.append(state) else: still_pending.append(state) @@ -524,6 +591,23 @@ class Proxywatchd(): self.totals['success'] += sc return ret + def cleanup_stale(self): + """Remove proxies that have been dead for too long.""" + stale_seconds = config.watchd.stale_days * 86400 + cutoff = int(time.time()) - stale_seconds + self._prep_db() + # delete proxies that: failed >= max_fail AND last tested before cutoff + result = self.mysqlite.execute( + 'DELETE FROM proxylist WHERE failed >= ? AND tested < ?', + (config.watchd.max_fail, cutoff) + ) + count = result.rowcount if hasattr(result, 'rowcount') else 0 + self.mysqlite.commit() + self._close_db() + if count > 0: + _log('removed %d stale proxies (not seen in %d days)' % (count, config.watchd.stale_days), 'watchd') + self.last_cleanup = time.time() + def start(self): if config.watchd.threads == 1 and _run_standalone: return self._run() @@ -542,6 +626,16 @@ class Proxywatchd(): def _run(self): _log('starting...', 'watchd') + # Start HTTP API server if enabled + if config.httpd.enabled: + from httpd import ProxyAPIServer + self.httpd_server = ProxyAPIServer( + config.httpd.listenip, + config.httpd.port, + config.watchd.database + ) + self.httpd_server.start() + # create worker threads with shared queues for i in range(config.watchd.threads): threadid = ''.join([random.choice(string.letters) for x in range(5)]) @@ -585,6 +679,14 @@ class Proxywatchd(): _log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd") sleeptime = 60 + # periodic stats report + if self.stats.should_report(config.watchd.stats_interval): + _log(self.stats.report(), 'stats') + + # periodic stale proxy cleanup (daily) + if (time.time() - self.last_cleanup) >= 86400: + self.cleanup_stale() + time.sleep(1) @@ -592,6 +694,12 @@ if __name__ == '__main__': _run_standalone = True config.load() + errors = config.validate() + if errors: + for e in errors: + _log(e, 'error') + import sys + sys.exit(1) w = Proxywatchd() try: