From 95bafcacffa098fbb3fd615626c29da61d5cb541 Mon Sep 17 00:00:00 2001 From: Username Date: Sun, 21 Dec 2025 23:37:19 +0100 Subject: [PATCH] proxywatchd: add startup logging, fix geolocation error handling --- proxywatchd.py | 462 +++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 411 insertions(+), 51 deletions(-) diff --git a/proxywatchd.py b/proxywatchd.py index 9d702a5..0f12f41 100644 --- a/proxywatchd.py +++ b/proxywatchd.py @@ -6,19 +6,28 @@ import random import string import re import heapq +import signal +import os try: import Queue except ImportError: import queue as Queue try: import IP2Location - import os geodb = IP2Location.IP2Location(os.path.join("data", "IP2LOCATION-LITE-DB1.BIN")) geolite = True -except (ImportError, IOError): +except (ImportError, IOError, ValueError): geolite = False +try: + import pyasn + asndb = pyasn.pyasn(os.path.join("data", "ipasn.dat")) + asn_enabled = True +except (ImportError, IOError, NameError): + asndb = None + asn_enabled = False + from config import Config import mysqlite @@ -32,6 +41,126 @@ config = Config() _run_standalone = False cached_dns = {} +# IP pattern for judge services (validates response contains valid IP in body) +IP_PATTERN = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}' +# Pattern for header echo - if X-Forwarded-For or Via present, proxy reveals chain +HEADER_REVEAL_PATTERN = r'(X-Forwarded-For|Via|X-Real-Ip|Forwarded)' + +# Patterns indicating judge is blocking the proxy (not a proxy failure) +# These should NOT count as proxy failures - retry with different judge +JUDGE_BLOCK_PATTERNS = [ + r'HTTP/1\.[01] 403', # Forbidden + r'HTTP/1\.[01] 429', # Too Many Requests + r'HTTP/1\.[01] 503', # Service Unavailable + r'captcha', # Captcha challenge + r'challenge', # Generic challenge + r'verify you are human', # Human verification + r'rate.?limit', # Rate limiting + r'too many requests', # Rate limiting + r'access.?denied', # Access denied + r'blocked', # Explicit block + r'Checking your browser', # Cloudflare JS challenge +] +JUDGE_BLOCK_RE = re.compile('|'.join(JUDGE_BLOCK_PATTERNS), re.IGNORECASE) + +# Check types: irc, http (header match), judges (body match), ssl (TLS handshake) +# Judge services - return IP in body (plain text, JSON, or HTML) +judges = { + # Plain text IP + 'api.ipify.org': IP_PATTERN, + 'icanhazip.com': IP_PATTERN, + 'checkip.amazonaws.com': IP_PATTERN, + 'ifconfig.me/ip': IP_PATTERN, + 'ipinfo.io/ip': IP_PATTERN, + 'ip.me': IP_PATTERN, + 'myip.dnsomatic.com': IP_PATTERN, + 'ident.me': IP_PATTERN, + 'ipecho.net/plain': IP_PATTERN, + 'wtfismyip.com/text': IP_PATTERN, + 'eth0.me': IP_PATTERN, + 'l2.io/ip': IP_PATTERN, + 'tnx.nl/ip': IP_PATTERN, + 'wgetip.com': IP_PATTERN, + 'curlmyip.net': IP_PATTERN, + # JSON responses (IP pattern matches within JSON) + 'httpbin.org/ip': IP_PATTERN, + 'ip-api.com/json': IP_PATTERN, + 'ipapi.co/json': IP_PATTERN, + 'ipwhois.app/json': IP_PATTERN, + # Header echo - for elite detection (check if proxy adds revealing headers) + 'httpbin.org/headers': IP_PATTERN, # returns request headers as JSON +} + + +class JudgeStats(): + """Track per-judge success/failure rates for reliability scoring. + + Judges that frequently block or rate-limit are temporarily avoided. + Stats decay over time to allow recovery. + """ + + def __init__(self, cooldown_seconds=300, block_threshold=3): + self.lock = threading.Lock() + self.stats = {} # judge -> {'success': n, 'fail': n, 'block': n, 'last_block': timestamp} + self.cooldown_seconds = cooldown_seconds # seconds to avoid blocked judges + self.block_threshold = block_threshold # consecutive blocks before cooldown + + def record_success(self, judge): + """Record successful judge response.""" + with self.lock: + if judge not in self.stats: + self.stats[judge] = {'success': 0, 'fail': 0, 'block': 0, 'last_block': 0} + self.stats[judge]['success'] += 1 + # Reset block count on success + self.stats[judge]['block'] = 0 + + def record_failure(self, judge): + """Record judge failure (proxy failed, not judge block).""" + with self.lock: + if judge not in self.stats: + self.stats[judge] = {'success': 0, 'fail': 0, 'block': 0, 'last_block': 0} + self.stats[judge]['fail'] += 1 + + def record_block(self, judge): + """Record judge blocking the proxy (403, captcha, rate-limit).""" + with self.lock: + if judge not in self.stats: + self.stats[judge] = {'success': 0, 'fail': 0, 'block': 0, 'last_block': 0} + self.stats[judge]['block'] += 1 + self.stats[judge]['last_block'] = time.time() + + def is_available(self, judge): + """Check if judge is available (not in cooldown).""" + with self.lock: + if judge not in self.stats: + return True + s = self.stats[judge] + # Check if in cooldown period + if s['block'] >= self.block_threshold: + if (time.time() - s['last_block']) < self.cooldown_seconds: + return False + # Cooldown expired, reset block count + s['block'] = 0 + return True + + def get_available_judges(self, judge_list): + """Return list of judges not in cooldown.""" + return [j for j in judge_list if self.is_available(j)] + + def status_line(self): + """Return status summary for logging.""" + with self.lock: + total = len(self.stats) + blocked = sum(1 for s in self.stats.values() + if s['block'] >= self.block_threshold and + (time.time() - s['last_block']) < self.cooldown_seconds) + return 'judges: %d total, %d in cooldown' % (total, blocked) + + +# Global judge stats instance +judge_stats = JudgeStats() + +# HTTP targets - check for specific headers regexes = { 'www.facebook.com': 'X-FB-Debug', 'www.fbcdn.net': 'X-FB-Debug', @@ -71,6 +200,28 @@ regexes = { 'www.time.com': 'x-amz-cf-pop' } +# SSL targets - verify TLS handshake only (MITM detection) +# No HTTP request needed - just connect with verifycert=True +ssl_targets = [ + 'www.google.com', + 'www.microsoft.com', + 'www.apple.com', + 'www.amazon.com', + 'www.cloudflare.com', + 'www.github.com', + 'www.mozilla.org', + 'www.wikipedia.org', + 'www.reddit.com', + 'www.twitter.com', + 'x.com', + 'www.facebook.com', + 'www.linkedin.com', + 'www.paypal.com', + 'www.stripe.com', + 'www.digicert.com', + 'www.letsencrypt.org', +] + class Stats(): """Track and report runtime statistics.""" @@ -302,7 +453,7 @@ class ProxyTestState(): When all tests complete, evaluate() determines final pass/fail. """ def __init__(self, ip, port, proto, failcount, success_count, total_duration, - country, mitm, consecutive_success, num_targets=3, oldies=False): + country, mitm, consecutive_success, asn=None, num_targets=3, oldies=False): self.ip = ip self.port = int(port) self.proxy = '%s:%s' % (ip, port) @@ -314,6 +465,7 @@ class ProxyTestState(): self.country = country self.mitm = mitm self.consecutive_success = consecutive_success + self.asn = asn self.isoldies = oldies self.num_targets = num_targets @@ -322,8 +474,10 @@ class ProxyTestState(): self.results = [] # list of (success, proto, duration, srv, tor, ssl) self.completed = False self.last_latency_ms = None # average latency from successful tests + self.exit_ip = None # IP seen by target server (for anonymity detection) + self.reveals_headers = None # True if proxy adds X-Forwarded-For/Via headers - def record_result(self, success, proto=None, duration=0, srv=None, tor=None, ssl=None, category=None): + def record_result(self, success, proto=None, duration=0, srv=None, tor=None, ssl=None, category=None, exit_ip=None, reveals_headers=None): """Record a single target test result. Thread-safe.""" with self.lock: self.results.append({ @@ -333,7 +487,9 @@ class ProxyTestState(): 'srv': srv, 'tor': tor, 'ssl': ssl, - 'category': category + 'category': category, + 'exit_ip': exit_ip, + 'reveals_headers': reveals_headers }) def is_complete(self): @@ -375,17 +531,32 @@ class ProxyTestState(): if cats: fail_category = max(cats.keys(), key=lambda k: cats[k]) - # require majority success (2/3) - if num_success >= 2: + # require success (single target mode) + if num_success >= 1: # use last successful result for metrics last_good = successes[-1] + # Extract exit IP and header info from successful judge responses + for s in successes: + if s.get('exit_ip'): + self.exit_ip = s['exit_ip'] + if s.get('reveals_headers') is not None: + self.reveals_headers = s['reveals_headers'] + if geolite and self.country is None: self.ip = self.rwip(self.ip) rec = geodb.get_all(self.ip) if rec is not None and rec.country_short: self.country = rec.country_short + if asn_enabled and self.asn is None: + try: + asn_result = asndb.lookup(self.ip) + if asn_result and asn_result[0]: + self.asn = asn_result[0] + except Exception: + pass + self.proto = last_good['proto'] self.failcount = 0 if (self.consecutive_success % 3) == 0: @@ -400,23 +571,40 @@ class ProxyTestState(): self.last_latency_ms = sum(durations) * 1000.0 / len(durations) torstats = "" if len(config.torhosts) == 1 else ' tor: %s;' % last_good['tor'] - _log('%s://%s:%d (%s) d: %.2f sec(s);%s srv: %s; ssl: %s; %d/%d targets' % ( + # Determine anonymity for status message + # transparent: exit_ip == proxy_ip + # anonymous: exit_ip != proxy_ip, adds revealing headers + # elite: exit_ip != proxy_ip, no revealing headers + anon_status = '' + if self.exit_ip: + if self.exit_ip == self.ip: + anon_status = ' anon: transparent;' + elif self.reveals_headers is False: + anon_status = ' anon: elite;' + elif self.reveals_headers is True: + anon_status = ' anon: anonymous;' + else: + anon_status = ' anon: anonymous;' # default if no header check + _log('%s://%s:%d (%s) d: %.2f sec(s);%s%s srv: %s; ssl: %s; %d/%d targets' % ( last_good['proto'], self.ip, self.port, self.country, - last_good['duration'], torstats, last_good['srv'], str(last_good['ssl']), + last_good['duration'], torstats, anon_status, last_good['srv'], str(last_good['ssl']), num_success, self.num_targets), 'xxxxx') return (True, None) - elif num_success == 1: - # partial success - don't increment fail, but reset consecutive - self.consecutive_success = 0 - _log('%s:%d partial success %d/%d targets' % ( - self.ip, self.port, num_success, self.num_targets), 'debug') - return (False, fail_category) - else: - self.failcount += 1 - self.consecutive_success = 0 - return (False, fail_category) + # Check if failures were all judge blocks (not proxy's fault) + judge_blocks = [f for f in failures if f.get('category') == 'judge_block'] + real_failures = [f for f in failures if f.get('category') != 'judge_block'] + + if judge_blocks and not real_failures: + # All failures were judge blocks - inconclusive, don't penalize proxy + # checktime still updated so we don't immediately retest + return (False, 'judge_block') + else: + # Real proxy failure + self.failcount += 1 + self.consecutive_success = 0 + return (False, fail_category) class TargetTestJob(): @@ -436,21 +624,64 @@ class TargetTestJob(): sock, proto, duration, tor, srv, failinc, is_ssl, err_cat = self._connect_and_test() if not sock: - self.proxy_state.record_result(False, category=err_cat) - return - - try: - recv = sock.recv(-1) - regex = '^(:|NOTICE|ERROR)' if self.checktype == 'irc' else regexes[srv] - - if re.search(regex, recv, re.IGNORECASE): + # SSL-only check passed (handshake success, no request needed) + if err_cat == 'ssl_ok': elapsed = time.time() - duration self.proxy_state.record_result( True, proto=proto, duration=elapsed, srv=srv, tor=tor, ssl=is_ssl ) else: - self.proxy_state.record_result(False, category='other') + self.proxy_state.record_result(False, category=err_cat) + return + + try: + recv = sock.recv(-1) + + # Select regex based on check type + if self.checktype == 'irc': + regex = '^(:|NOTICE|ERROR)' + elif self.checktype == 'judges': + regex = judges[srv] + elif self.checktype == 'ssl': + # Should not reach here - ssl returns before recv + self.proxy_state.record_result(True, proto=proto, srv=srv, ssl=is_ssl) + return + else: # http + regex = regexes[srv] + + if re.search(regex, recv, re.IGNORECASE): + elapsed = time.time() - duration + # Extract exit IP from judge response + exit_ip = None + reveals_headers = None + if self.checktype == 'judges': + ip_match = re.search(IP_PATTERN, recv) + if ip_match: + exit_ip = ip_match.group(0) + # Check for header echo judge (elite detection) + if 'headers' in srv: + # If X-Forwarded-For/Via/etc present, proxy reveals chain + reveals_headers = bool(re.search(HEADER_REVEAL_PATTERN, recv, re.IGNORECASE)) + # Record successful judge + judge_stats.record_success(srv) + self.proxy_state.record_result( + True, proto=proto, duration=elapsed, + srv=srv, tor=tor, ssl=is_ssl, exit_ip=exit_ip, + reveals_headers=reveals_headers + ) + else: + # Check if judge is blocking us (not a proxy failure) + if self.checktype == 'judges' and JUDGE_BLOCK_RE.search(recv): + judge_stats.record_block(srv) + # Don't count as proxy failure - judge is blocking + self.proxy_state.record_result(False, category='judge_block') + if config.watchd.debug: + _log('judge %s blocked proxy %s' % (srv, self.proxy_state.proxy), 'debug') + else: + if self.checktype == 'judges': + judge_stats.record_failure(srv) + self.proxy_state.record_result(False, category='other') except KeyboardInterrupt as e: sock.disconnect() @@ -464,17 +695,31 @@ class TargetTestJob(): """Connect to target through the proxy and send test packet.""" ps = self.proxy_state srvname = self.target_srv.strip() if self.checktype == 'irc' else self.target_srv - - use_ssl = random.choice([0, 1]) if config.watchd.use_ssl == 2 else config.watchd.use_ssl - if ps.consecutive_success > 0 and (ps.consecutive_success % 3) == 0: - use_ssl = 1 - - if self.checktype == 'irc': - server_port = 6697 if use_ssl else 6667 + # For judges, extract host from 'host/path' format + if self.checktype == 'judges' and '/' in srvname: + connect_host = srvname.split('/')[0] else: - server_port = 443 if use_ssl else 80 + connect_host = srvname - verifycert = True if use_ssl else False + # SSL checktype: always use SSL with certificate verification + if self.checktype == 'ssl': + use_ssl = 1 + ssl_only_check = True # handshake only, no HTTP request + server_port = 443 + verifycert = True + else: + use_ssl = random.choice([0, 1]) if config.watchd.use_ssl == 2 else config.watchd.use_ssl + ssl_only_check = False # minimal SSL test (handshake only, no request) + if ps.consecutive_success > 0 and (ps.consecutive_success % 3) == 0: + use_ssl = 1 + ssl_only_check = True # periodic MITM check - handshake is sufficient + + if self.checktype == 'irc': + server_port = 6697 if use_ssl else 6667 + else: + server_port = 443 if use_ssl else 80 + + verifycert = True if use_ssl else False protos = ['http', 'socks5', 'socks4'] if ps.proto is None else [ps.proto] last_error_category = None @@ -487,9 +732,9 @@ class TargetTestJob(): else: torhost = random.choice(config.torhosts) if proto == 'socks4': - srv = socks4_resolve(srvname, server_port) + srv = socks4_resolve(connect_host, server_port) else: - srv = srvname + srv = connect_host if not srv: continue @@ -504,9 +749,24 @@ class TargetTestJob(): proxies=proxies, timeout=config.watchd.timeout, verifycert=verifycert) sock.connect() + + # SSL-only check: handshake passed, no request needed + if ssl_only_check: + elapsed = time.time() - duration + if pool: + pool.record_success(torhost, elapsed) + sock.disconnect() + return None, proto, duration, torhost, srvname, 0, use_ssl, 'ssl_ok' + if self.checktype == 'irc': sock.send('NICK\n') - else: + elif self.checktype == 'judges': + # GET request to receive body with IP address + sock.send('GET /%s HTTP/1.0\r\nHost: %s\r\n\r\n' % ( + srvname.split('/', 1)[1] if '/' in srvname else '', + srvname.split('/')[0] + )) + else: # http - HEAD is sufficient for header checks sock.send('HEAD / HTTP/1.0\r\nHost: %s\r\n\r\n' % srvname) # Record success in pool if pool: @@ -537,7 +797,7 @@ class TargetTestJob(): _log("could not connect to tor, sleep 5s", "ERROR") time.sleep(5) elif et == rocksock.RS_ET_GAI: - _log("could not resolve connection target %s" % srvname, "ERROR") + _log("could not resolve connection target %s" % connect_host, "ERROR") break elif err == rocksock.RS_E_SSL_CERTIFICATE_ERROR: ps.mitm = 1 @@ -649,6 +909,60 @@ class Proxywatchd(): target_queue_per_thread=10 ) self.thread_id_counter = 0 + self.last_jobs_log = 0 + self.last_update_log = 0 + self.log_interval = 60 # seconds between routine log messages + + # Register SIGHUP handler for config reload + signal.signal(signal.SIGHUP, self._handle_sighup) + + def _handle_sighup(self, signum, frame): + """Handle SIGHUP signal for config reload.""" + _log('received SIGHUP, reloading config...', 'watchd') + self.reload_config() + + def reload_config(self): + """Reload configuration without restart. + + Hot-reloadable settings: + threads, timeout, checktime, perfail_checktime, submit_after, + stats_interval, debug, tor_safeguard, stale_days, max_fail, + outage_threshold + + Requires restart: + database, source_file, checktype + """ + old_threads = config.watchd.threads + old_checktype = config.watchd.checktype + + try: + config.load() + errors = config.validate() + if errors: + for e in errors: + _log('config error: %s' % e, 'error') + _log('config reload failed, keeping old config', 'error') + return False + except Exception as e: + _log('config reload failed: %s' % str(e), 'error') + return False + + # Update runtime values + self.submit_after = config.watchd.submit_after + + # Update scaler limits + self.scaler.min_threads = max(2, config.watchd.threads // 4) + self.scaler.max_threads = config.watchd.threads * 2 + + # Warn about values requiring restart + if config.watchd.checktype != old_checktype: + _log('checktype changed (%s -> %s), requires restart to take effect' % ( + old_checktype, config.watchd.checktype), 'warn') + + _log('config reloaded: threads=%d timeout=%d checktime=%d max_fail=%d' % ( + config.watchd.threads, config.watchd.timeout, + config.watchd.checktime, config.watchd.max_fail), 'watchd') + return True def _spawn_thread(self): """Spawn a new worker thread.""" @@ -690,8 +1004,16 @@ class Proxywatchd(): def fetch_rows(self): self.isoldies = False - q = 'SELECT ip,port,proto,failed,success_count,total_duration,country,mitm,consecutive_success FROM proxylist WHERE failed >= ? and failed < ? and (tested + ? + (failed * ?)) < ? ORDER BY RANDOM()' - rows = self.mysqlite.execute(q, (0, config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, time.time())).fetchall() + q = 'SELECT ip,port,proto,failed,success_count,total_duration,country,mitm,consecutive_success,asn FROM proxylist WHERE failed >= ? and failed < ? and (tested + ? + (failed * ?)) < ? ORDER BY RANDOM()' + now = time.time() + params = (0, config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now) + if config.watchd.debug: + _log('fetch_rows: db=%s checktime=%d perfail=%d max_fail=%d now=%d' % ( + config.watchd.database, config.watchd.checktime, config.watchd.perfail_checktime, + config.watchd.max_fail, int(now)), 'debug') + rows = self.mysqlite.execute(q, params).fetchall() + if config.watchd.debug: + _log('fetch_rows: got %d rows' % len(rows), 'debug') # check oldies ? if len(rows) < config.watchd.threads: rows = [] @@ -708,12 +1030,23 @@ class Proxywatchd(): self.tor_safeguard = config.watchd.tor_safeguard rows = self.fetch_rows() checktype = config.watchd.checktype - num_targets = 3 + num_targets = 1 # select target pool based on checktype if checktype == 'irc': target_pool = config.servers - else: + elif checktype == 'judges': + # Filter out judges in cooldown (blocked/rate-limited) + all_judges = list(judges.keys()) + target_pool = judge_stats.get_available_judges(all_judges) + if not target_pool: + # All judges in cooldown - use all anyway + target_pool = all_judges + if config.watchd.debug: + _log('all judges in cooldown, using full list', 'debug') + elif checktype == 'ssl': + target_pool = ssl_targets + else: # http target_pool = list(regexes.keys()) # create all jobs first, then shuffle for interleaving @@ -724,7 +1057,7 @@ class Proxywatchd(): # create shared state for this proxy state = ProxyTestState( row[0], row[1], row[2], row[3], row[4], row[5], - row[6], row[7], row[8], num_targets=num_targets, + row[6], row[7], row[8], asn=row[9], num_targets=num_targets, oldies=self.isoldies ) new_states.append(state) @@ -756,9 +1089,11 @@ class Proxywatchd(): self._close_db() proxy_count = len(new_states) job_count = len(all_jobs) - if proxy_count > 0: + now = time.time() + if proxy_count > 0 and (now - self.last_jobs_log) >= self.log_interval: _log("created %d jobs for %d proxies (%d targets each)" % ( job_count, proxy_count, num_targets), 'watchd') + self.last_jobs_log = now return job_count def collect_work(self): @@ -809,7 +1144,7 @@ class Proxywatchd(): latency_updates.append((job.proxy, job.last_latency_ms)) args.append((job.failcount, job.checktime, 1, job.country, job.proto, job.success_count, job.total_duration, job.mitm, - job.consecutive_success, job.proxy)) + job.consecutive_success, job.asn, job.proxy)) success_rate = (float(sc) / len(self.collected)) * 100 ret = True @@ -823,20 +1158,28 @@ class Proxywatchd(): if job.failcount == 0: args.append((job.failcount, job.checktime, 1, job.country, job.proto, job.success_count, job.total_duration, job.mitm, - job.consecutive_success, job.proxy)) + job.consecutive_success, job.asn, job.proxy)) if job.last_latency_ms is not None: latency_updates.append((job.proxy, job.last_latency_ms)) ret = False - _log("updating %d DB entries (success rate: %.2f%%)" % (len(self.collected), success_rate), 'watchd') + now = time.time() + if (now - self.last_update_log) >= self.log_interval or success_rate > 0: + _log("updating %d DB entries (success rate: %.2f%%)" % (len(self.collected), success_rate), 'watchd') + self.last_update_log = now self._prep_db() - query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=? WHERE proxy=?' + query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=?,asn=? WHERE proxy=?' self.mysqlite.executemany(query, args) # Update latency metrics for successful proxies for proxy, latency_ms in latency_updates: dbs.update_proxy_latency(self.mysqlite, proxy, latency_ms) + # Update anonymity for proxies with exit IP data + for job in self.collected: + if job.failcount == 0 and job.exit_ip: + dbs.update_proxy_anonymity(self.mysqlite, job.proxy, job.exit_ip, job.ip, job.reveals_headers) + self.mysqlite.commit() self._close_db() self.collected = [] @@ -878,6 +1221,20 @@ class Proxywatchd(): def _run(self): _log('starting...', 'watchd') + _log('config: db=%s checktype=%s threads=%d checktime=%d perfail=%d max_fail=%d' % ( + config.watchd.database, config.watchd.checktype, config.watchd.threads, + config.watchd.checktime, config.watchd.perfail_checktime, config.watchd.max_fail), 'watchd') + + # Log database status at startup + self._prep_db() + total = self.mysqlite.execute('SELECT COUNT(*) FROM proxylist').fetchone()[0] + now = time.time() + due = self.mysqlite.execute( + 'SELECT COUNT(*) FROM proxylist WHERE failed >= 0 and failed < ? and (tested + ? + (failed * ?)) < ?', + (config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now) + ).fetchone()[0] + self._close_db() + _log('database: %d total proxies, %d due for testing' % (total, due), 'watchd') # Initialize Tor connection pool connection_pool.init_pool(config.torhosts, warmup=True) @@ -942,6 +1299,9 @@ class Proxywatchd(): pool = connection_pool.get_pool() if pool: _log(pool.status_line(), 'stats') + # Report judge stats (when using judges checktype) + if config.watchd.checktype == 'judges': + _log(judge_stats.status_line(), 'stats') # Report scaler status _log(self.scaler.status_line(len(self.threads), self.job_queue.qsize()), 'scaler')