diff --git a/proxywatchd.py b/proxywatchd.py index 4ed860f..69b7cdb 100644 --- a/proxywatchd.py +++ b/proxywatchd.py @@ -44,7 +44,7 @@ import dns from misc import _log, categorize_error, tor_proxy_url, is_ssl_protocol_error import rocksock import connection_pool -from stats import JudgeStats, Stats, regexes, ssl_targets, try_div +from stats import TargetStats, JudgeStats, Stats, regexes, ssl_targets, try_div from mitm import MITMCertStats, extract_cert_info, get_mitm_certificate from dns import socks4_resolve from job import PriorityJobQueue, calculate_priority @@ -164,9 +164,9 @@ DEAD_PROXY = -1 # Error categories that indicate proxy is definitely dead (not temporary failure) FATAL_ERROR_CATEGORIES = ('refused', 'unreachable', 'auth') -# Patterns indicating judge is blocking the proxy (not a proxy failure) -# These should NOT count as proxy failures - retry with different judge -JUDGE_BLOCK_PATTERNS = [ +# Patterns indicating HTTP target is blocking the proxy (not a proxy failure) +# These should NOT count as proxy failures - applies to judges and head targets +HTTP_BLOCK_PATTERNS = [ r'HTTP/1\.[01] 403', # Forbidden r'HTTP/1\.[01] 429', # Too Many Requests r'HTTP/1\.[01] 503', # Service Unavailable @@ -179,7 +179,7 @@ JUDGE_BLOCK_PATTERNS = [ r'blocked', # Explicit block r'Checking your browser', # Cloudflare JS challenge ] -JUDGE_BLOCK_RE = re.compile('|'.join(JUDGE_BLOCK_PATTERNS), re.IGNORECASE) +HTTP_BLOCK_RE = re.compile('|'.join(HTTP_BLOCK_PATTERNS), re.IGNORECASE) # Check types: irc, http (header match), judges (body match), ssl (TLS handshake) # Judge services - return IP in body (plain text, JSON, or HTML) @@ -213,6 +213,9 @@ judges = { # Global instances judge_stats = JudgeStats() +head_target_stats = TargetStats(cooldown_seconds=300, block_threshold=3) +ssl_target_stats = TargetStats(cooldown_seconds=300, block_threshold=3) +irc_target_stats = TargetStats(cooldown_seconds=300, block_threshold=3) mitm_cert_stats = MITMCertStats() @@ -411,18 +414,19 @@ class ProxyTestState(object): self.evaluated = True self.checktime = int(time.time()) - # Filter out judge_block results (inconclusive, neither pass nor fail) - real_results = [r for r in self.results if r.get('category') != 'judge_block'] + # Filter out target_block results (inconclusive, neither pass nor fail) + block_cats = ('judge_block', 'target_block') + real_results = [r for r in self.results if r.get('category') not in block_cats] successes = [r for r in real_results if r['success']] failures = [r for r in real_results if not r['success']] num_success = len(successes) - judge_blocks = len(self.results) - len(real_results) - _dbg('evaluate: %d success, %d fail, %d judge_block, results=%d' % ( - num_success, len(failures), judge_blocks, len(self.results)), self.proxy) + target_blocks = len(self.results) - len(real_results) + _dbg('evaluate: %d success, %d fail, %d target_block, results=%d' % ( + num_success, len(failures), target_blocks, len(self.results)), self.proxy) - # All results were judge blocks: inconclusive, preserve current state + # All results were target blocks: inconclusive, preserve current state if not real_results and self.results: - _dbg('all results inconclusive (judge_block), no state change', self.proxy) + _dbg('all results inconclusive (target_block), no state change', self.proxy) self.failcount = self.original_failcount return (self.original_failcount == 0, None) @@ -617,6 +621,10 @@ class TargetTestJob(object): reveals_headers = bool(re.search(HEADER_REVEAL_PATTERN, recv, re.IGNORECASE)) # Record successful judge judge_stats.record_success(srv) + elif self.checktype == 'head': + head_target_stats.record_success(srv) + elif self.checktype == 'irc': + irc_target_stats.record_success(srv) self.proxy_state.record_result( True, proto=proto, duration=elapsed, srv=srv, tor=tor, ssl=is_ssl, exit_ip=exit_ip, @@ -624,22 +632,28 @@ class TargetTestJob(object): ) else: _dbg('regex NO MATCH, recv[:100]=%r' % recv[:100], self.proxy_state.proxy) - # Check if judge is blocking us (not a proxy failure) - if self.checktype == 'judges' and JUDGE_BLOCK_RE.search(recv): - judge_stats.record_block(srv) - # Judge block = inconclusive, not a pass or fail - _dbg('judge BLOCK detected, skipping (neutral)', self.proxy_state.proxy) + # Check if HTTP target is blocking us (not a proxy failure) + if self.checktype in ('judges', 'head') and HTTP_BLOCK_RE.search(recv): + if self.checktype == 'judges': + judge_stats.record_block(srv) + else: + head_target_stats.record_block(srv) + _dbg('target BLOCK detected, skipping (neutral)', self.proxy_state.proxy) self.proxy_state.record_result( - False, category='judge_block', proto=proto, + False, category='target_block', proto=proto, srv=srv, tor=tor, ssl=is_ssl ) if config.watchd.debug: - _log('judge %s challenged proxy %s (neutral, skipped)' % ( - srv, self.proxy_state.proxy), 'debug') + _log('%s %s challenged proxy %s (neutral, skipped)' % ( + self.checktype, srv, self.proxy_state.proxy), 'debug') else: _dbg('FAIL: no match, no block', self.proxy_state.proxy) if self.checktype == 'judges': judge_stats.record_failure(srv) + elif self.checktype == 'head': + head_target_stats.record_failure(srv) + elif self.checktype == 'irc': + irc_target_stats.record_failure(srv) self.proxy_state.record_result(False, category='other') except KeyboardInterrupt as e: @@ -823,7 +837,8 @@ class TargetTestJob(object): string (for secondary check SSL/plain decision). """ ps = self.proxy_state - ssl_target = random.choice(ssl_targets) + available_ssl = ssl_target_stats.get_available(ssl_targets) or ssl_targets + ssl_target = random.choice(available_ssl) last_error_category = None last_ssl_reason = None @@ -866,6 +881,7 @@ class TargetTestJob(object): elapsed = time.time() - duration if pool: pool.record_success(torhost, elapsed) + ssl_target_stats.record_success(ssl_target) sock.disconnect() _dbg('SSL handshake OK', ps.proxy) return (None, proto, duration, torhost, ssl_target, 0, 1, 'ssl_ok'), None @@ -901,9 +917,17 @@ class TargetTestJob(object): # Check for Tor connection issues if et == rocksock.RS_ET_OWN: - if e.get_failedproxy() == 0 and err == rocksock.RS_E_TARGET_CONN_REFUSED: + fp = e.get_failedproxy() + if fp == 0 and err == rocksock.RS_E_TARGET_CONN_REFUSED: if pool: pool.record_failure(torhost) + elif fp == 1 and (err == rocksock.RS_E_REMOTE_DISCONNECTED or + err == rocksock.RS_E_HIT_TIMEOUT): + # Target-side failure + ssl_target_stats.record_failure(ssl_target) + elif et == rocksock.RS_ET_GAI: + # DNS failure -- target unresolvable + ssl_target_stats.record_block(ssl_target) except KeyboardInterrupt: raise @@ -1025,6 +1049,11 @@ class TargetTestJob(object): if et == rocksock.RS_ET_OWN: if fp == 1 and (err == rocksock.RS_E_REMOTE_DISCONNECTED or err == rocksock.RS_E_HIT_TIMEOUT): + # Target-side failure -- proxy reached target but it's down + if self.checktype == 'head': + head_target_stats.record_failure(srvname) + elif self.checktype == 'irc': + irc_target_stats.record_failure(srvname) break elif fp == 0 and err == rocksock.RS_E_TARGET_CONN_REFUSED: # Tor connection failed - record in pool @@ -1034,6 +1063,11 @@ class TargetTestJob(object): _log("could not connect to tor, sleep 5s", "ERROR") time.sleep(5) elif et == rocksock.RS_ET_GAI: + # DNS failure -- target hostname unresolvable (hard failure) + if self.checktype == 'head': + head_target_stats.record_block(connect_host) + elif self.checktype == 'irc': + irc_target_stats.record_block(srvname) _log("could not resolve connection target %s" % connect_host, "ERROR") break elif et == rocksock.RS_ET_SSL and err == rocksock.RS_E_SSL_CERTIFICATE_ERROR: @@ -1506,7 +1540,7 @@ class Proxywatchd(): _dbg('prepare_jobs: %d rows, checktypes=%s' % (len(rows), config.watchd.checktypes)) checktypes = config.watchd.checktypes - # Build target pools for each checktype + # Build target pools for each checktype (filter out targets in cooldown) target_pools = {} for ct in checktypes: if ct == 'none': @@ -1514,19 +1548,23 @@ class Proxywatchd(): target_pools[ct] = ssl_targets _dbg('target_pool[none]: SSL-only mode, %d ssl targets' % len(ssl_targets)) elif ct == 'irc': - target_pools[ct] = config.servers - _dbg('target_pool[irc]: %d servers' % len(config.servers)) + all_servers = config.servers + available = irc_target_stats.get_available(all_servers) + target_pools[ct] = available if available else all_servers + _dbg('target_pool[irc]: %d/%d servers available' % (len(target_pools[ct]), len(all_servers))) elif ct == 'judges': - # Filter out judges in cooldown (blocked/rate-limited) all_judges = list(judges.keys()) - available = judge_stats.get_available_judges(all_judges) + available = judge_stats.get_available(all_judges) target_pools[ct] = available if available else all_judges elif ct == 'ssl': - target_pools[ct] = ssl_targets - _dbg('target_pool[ssl]: %d targets' % len(ssl_targets)) + available = ssl_target_stats.get_available(ssl_targets) + target_pools[ct] = available if available else ssl_targets + _dbg('target_pool[ssl]: %d/%d targets available' % (len(target_pools[ct]), len(ssl_targets))) else: # head - target_pools[ct] = list(regexes.keys()) - _dbg('target_pool[%s]: %d targets' % (ct, len(regexes))) + all_targets = list(regexes.keys()) + available = head_target_stats.get_available(all_targets) + target_pools[ct] = available if available else all_targets + _dbg('target_pool[%s]: %d/%d targets available' % (ct, len(target_pools[ct]), len(all_targets))) # create all jobs first, then shuffle for interleaving all_jobs = [] @@ -1852,15 +1890,25 @@ class Proxywatchd(): # Judge stats (when using judges checktype) if 'judges' in config.watchd.checktypes: js = judge_stats.get_stats() + # Remap 'target' -> 'judge' for dashboard compatibility + top = [dict(j, judge=j['target']) for j in js.get('top', [])[:5]] stats_data['judges'] = { 'total': js.get('total', 0), 'available': js.get('available', 0), 'in_cooldown': js.get('in_cooldown', 0), - 'top_judges': js.get('top', [])[:5] # top 5 most successful + 'top_judges': top, } else: stats_data['judges'] = None + # Target health stats (all target pools) + stats_data['target_health'] = { + 'head': head_target_stats.get_stats(), + 'ssl': ssl_target_stats.get_stats(), + 'irc': irc_target_stats.get_stats(), + 'judges': judge_stats.get_stats(), + } + # Scraper/engine stats if scraper_available: scraper_stats = scraper_module.get_scraper_stats() diff --git a/stats.py b/stats.py index 1b6dc95..d170198 100644 --- a/stats.py +++ b/stats.py @@ -14,60 +14,64 @@ def try_div(a, b): return 0 -class JudgeStats(): - """Track per-judge success/failure rates for reliability scoring. +class TargetStats(): + """Track per-target success/failure rates with cooldown. - Judges that frequently block or rate-limit are temporarily avoided. - Stats decay over time to allow recovery. + Targets that frequently block or fail are temporarily avoided. + Block counters reset on success or cooldown expiry. + + Used for all target pools: judges, head targets, SSL targets, IRC servers. """ def __init__(self, cooldown_seconds=300, block_threshold=3): self.lock = threading.Lock() - self.stats = {} # judge -> {'success': n, 'fail': n, 'block': n, 'last_block': timestamp} - self.cooldown_seconds = cooldown_seconds # seconds to avoid blocked judges - self.block_threshold = block_threshold # consecutive blocks before cooldown + self.stats = {} # target -> {'success': n, 'fail': n, 'block': n, 'last_block': timestamp} + self.cooldown_seconds = cooldown_seconds + self.block_threshold = block_threshold - def record_success(self, judge): - """Record successful judge response.""" - with self.lock: - if judge not in self.stats: - self.stats[judge] = {'success': 0, 'fail': 0, 'block': 0, 'last_block': 0} - self.stats[judge]['success'] += 1 - # Reset block count on success - self.stats[judge]['block'] = 0 + def _ensure(self, target): + if target not in self.stats: + self.stats[target] = {'success': 0, 'fail': 0, 'block': 0, 'last_block': 0} - def record_failure(self, judge): - """Record judge failure (proxy failed, not judge block).""" + def record_success(self, target): + """Record successful target response.""" with self.lock: - if judge not in self.stats: - self.stats[judge] = {'success': 0, 'fail': 0, 'block': 0, 'last_block': 0} - self.stats[judge]['fail'] += 1 + self._ensure(target) + self.stats[target]['success'] += 1 + self.stats[target]['block'] = 0 - def record_block(self, judge): - """Record judge blocking the proxy (403, captcha, rate-limit).""" + def record_failure(self, target): + """Record target failure (soft -- doesn't trigger cooldown).""" with self.lock: - if judge not in self.stats: - self.stats[judge] = {'success': 0, 'fail': 0, 'block': 0, 'last_block': 0} - self.stats[judge]['block'] += 1 - self.stats[judge]['last_block'] = time.time() + self._ensure(target) + self.stats[target]['fail'] += 1 - def is_available(self, judge): - """Check if judge is available (not in cooldown).""" + def record_block(self, target): + """Record target block (403, captcha, DNS failure, rate-limit).""" with self.lock: - if judge not in self.stats: + self._ensure(target) + self.stats[target]['block'] += 1 + self.stats[target]['last_block'] = time.time() + + def is_available(self, target): + """Check if target is available (not in cooldown).""" + with self.lock: + if target not in self.stats: return True - s = self.stats[judge] - # Check if in cooldown period + s = self.stats[target] if s['block'] >= self.block_threshold: if (time.time() - s['last_block']) < self.cooldown_seconds: return False - # Cooldown expired, reset block count s['block'] = 0 return True + def get_available(self, target_list): + """Return targets not in cooldown.""" + return [t for t in target_list if self.is_available(t)] + def get_available_judges(self, judge_list): - """Return list of judges not in cooldown.""" - return [j for j in judge_list if self.is_available(j)] + """Compat alias for get_available().""" + return self.get_available(judge_list) def status_line(self): """Return status summary for logging.""" @@ -76,7 +80,7 @@ class JudgeStats(): blocked = sum(1 for s in self.stats.values() if s['block'] >= self.block_threshold and (time.time() - s['last_block']) < self.cooldown_seconds) - return 'judges: %d total, %d in cooldown' % (total, blocked) + return '%d total, %d in cooldown' % (total, blocked) def get_stats(self): """Return statistics dict for API/dashboard.""" @@ -87,18 +91,21 @@ class JudgeStats(): if s['block'] >= self.block_threshold and (now - s['last_block']) < self.cooldown_seconds) available = total - in_cooldown - # Get top judges by success count top = [] - for judge, s in self.stats.items(): + for target, s in self.stats.items(): total_tests = s['success'] + s['fail'] if total_tests > 0: success_pct = (s['success'] * 100.0) / total_tests - top.append({'judge': judge, 'success': s['success'], + top.append({'target': target, 'success': s['success'], 'tests': total_tests, 'rate': round(success_pct, 1)}) top.sort(key=lambda x: x['success'], reverse=True) return {'total': total, 'available': available, 'in_cooldown': in_cooldown, 'top': top} +# Backwards-compatible alias +JudgeStats = TargetStats + + # HTTP targets - check for specific headers regexes = { 'www.facebook.com': 'X-FB-Debug',