diff --git a/proxywatchd.py b/proxywatchd.py index 2bc2f14..f7e2460 100644 --- a/proxywatchd.py +++ b/proxywatchd.py @@ -19,8 +19,10 @@ import string import re import heapq import signal +import network_stats import os +import ssl try: import Queue except ImportError: @@ -285,9 +287,10 @@ class Stats(): # Failure category tracking self.fail_categories = {} - # Protocol tracking (tested and passed separately) + # Protocol tracking (tested, passed, and failed separately) self.proto_tested = {'http': 0, 'socks4': 0, 'socks5': 0} self.proto_passed = {'http': 0, 'socks4': 0, 'socks5': 0} + self.proto_failed = {'http': {}, 'socks4': {}, 'socks5': {}} # Failures by category per proto # Legacy alias for compatibility self.by_proto = self.proto_passed @@ -330,6 +333,7 @@ class Stats(): self.ssl_tested = 0 self.ssl_passed = 0 self.ssl_failed = 0 + self.ssl_fail_categories = {} # Track SSL failures by category self.mitm_detected = 0 self.cert_errors = 0 @@ -381,6 +385,9 @@ class Stats(): self.failed += 1 if category: self.fail_categories[category] = self.fail_categories.get(category, 0) + 1 + # Track failures by protocol + if proto and proto in self.proto_failed: + self.proto_failed[proto][category] = self.proto_failed[proto].get(category, 0) + 1 # SSL/TLS tracking if ssl_test: @@ -389,6 +396,9 @@ class Stats(): self.ssl_passed += 1 else: self.ssl_failed += 1 + # Track which error caused the SSL failure + if category: + self.ssl_fail_categories[category] = self.ssl_fail_categories.get(category, 0) + 1 if mitm: self.mitm_detected += 1 if cert_error: @@ -510,16 +520,19 @@ class Stats(): return result def get_proto_stats(self): - """Get protocol-specific success rates.""" + """Get protocol-specific success rates and failure breakdown.""" with self.lock: result = {} for proto in ['http', 'socks4', 'socks5']: tested = self.proto_tested[proto] passed = self.proto_passed[proto] + failed = sum(self.proto_failed[proto].values()) result[proto] = { 'tested': tested, 'passed': passed, + 'failed': failed, 'success_rate': round(passed / tested * 100, 1) if tested > 0 else 0, + 'fail_reasons': dict(self.proto_failed[proto]) if self.proto_failed[proto] else {}, } return result @@ -568,6 +581,14 @@ class Stats(): # Restore failure categories if state.get('fail_categories'): self.fail_categories = dict(state['fail_categories']) + # Restore SSL failure categories + if state.get('ssl_fail_categories'): + self.ssl_fail_categories = dict(state['ssl_fail_categories']) + # Restore protocol failure categories + if state.get('proto_failed'): + for proto in ['http', 'socks4', 'socks5']: + if proto in state['proto_failed']: + self.proto_failed[proto] = dict(state['proto_failed'][proto]) # Restore geo tracking if state.get('country_passed'): self.country_passed = dict(state['country_passed']) @@ -632,6 +653,197 @@ def try_div(a, b): return 0 +class MITMCertStats(object): + """Track MITM certificate statistics.""" + + def __init__(self): + self.lock = threading.Lock() + self.certs = {} # fingerprint -> cert_info dict + self.by_org = {} # organization -> count + self.by_issuer = {} # issuer CN -> count + self.by_proxy = {} # proxy IP -> list of fingerprints + self.total_count = 0 + self.recent_certs = [] # last N certificates seen + + def add_cert(self, proxy_ip, cert_info): + """Add a MITM certificate to statistics.""" + if not cert_info: + return + fp = cert_info.get('fingerprint', '') + if not fp: + return + + with self.lock: + self.total_count += 1 + + # Store unique certs by fingerprint + if fp not in self.certs: + self.certs[fp] = cert_info + self.certs[fp]['first_seen'] = time.time() + self.certs[fp]['count'] = 1 + self.certs[fp]['proxies'] = [proxy_ip] + else: + self.certs[fp]['count'] += 1 + self.certs[fp]['last_seen'] = time.time() + if proxy_ip not in self.certs[fp]['proxies']: + self.certs[fp]['proxies'].append(proxy_ip) + + # Track by organization + org = cert_info.get('subject_o', 'Unknown') + self.by_org[org] = self.by_org.get(org, 0) + 1 + + # Track by issuer + issuer = cert_info.get('issuer_cn', 'Unknown') + self.by_issuer[issuer] = self.by_issuer.get(issuer, 0) + 1 + + # Track proxies using this cert + if proxy_ip not in self.by_proxy: + self.by_proxy[proxy_ip] = [] + if fp not in self.by_proxy[proxy_ip]: + self.by_proxy[proxy_ip].append(fp) + + # Keep recent certs (last 50) + self.recent_certs.append({ + 'fingerprint': fp, + 'proxy': proxy_ip, + 'subject_cn': cert_info.get('subject_cn', ''), + 'issuer_cn': cert_info.get('issuer_cn', ''), + 'timestamp': time.time() + }) + if len(self.recent_certs) > 50: + self.recent_certs.pop(0) + + def get_stats(self): + """Get MITM certificate statistics for API.""" + with self.lock: + # Top organizations + top_orgs = sorted(self.by_org.items(), key=lambda x: x[1], reverse=True)[:10] + # Top issuers + top_issuers = sorted(self.by_issuer.items(), key=lambda x: x[1], reverse=True)[:10] + # Unique certs sorted by count + unique_certs = [] + for fp, info in self.certs.items(): + cert_entry = {'fingerprint': fp} + cert_entry.update(info) + unique_certs.append(cert_entry) + unique_certs = sorted(unique_certs, key=lambda x: x.get('count', 0), reverse=True)[:20] + + return { + 'total_detections': self.total_count, + 'unique_certs': len(self.certs), + 'unique_proxies': len(self.by_proxy), + 'top_organizations': [{'name': o, 'count': c} for o, c in top_orgs], + 'top_issuers': [{'name': i, 'count': c} for i, c in top_issuers], + 'certificates': unique_certs, + 'recent': list(self.recent_certs[-20:]) + } + + +def extract_cert_info(cert_der): + """Extract certificate information from DER-encoded certificate. + + Args: + cert_der: DER-encoded certificate bytes + + Returns: + dict with certificate details or None on failure + """ + import hashlib + try: + # Decode DER to get certificate details + # Python 2/3 compatible approach using ssl module + from OpenSSL import crypto + x509 = crypto.load_certificate(crypto.FILETYPE_ASN1, cert_der) + + subject = x509.get_subject() + issuer = x509.get_issuer() + + # Parse dates (format: YYYYMMDDhhmmssZ) + not_before = x509.get_notBefore() + not_after = x509.get_notAfter() + if isinstance(not_before, bytes): + not_before = not_before.decode('ascii') + if isinstance(not_after, bytes): + not_after = not_after.decode('ascii') + + # Calculate fingerprint + fp = hashlib.sha256(cert_der).hexdigest() + + return { + 'fingerprint': fp[:16], # Short fingerprint for display + 'fingerprint_full': fp, + 'subject_cn': subject.CN or '', + 'subject_o': subject.O or '', + 'subject_ou': subject.OU or '', + 'subject_c': subject.C or '', + 'issuer_cn': issuer.CN or '', + 'issuer_o': issuer.O or '', + 'serial': str(x509.get_serial_number()), + 'not_before': not_before, + 'not_after': not_after, + 'version': x509.get_version(), + 'sig_algo': x509.get_signature_algorithm().decode('ascii') if hasattr(x509.get_signature_algorithm(), 'decode') else str(x509.get_signature_algorithm()), + } + except ImportError: + # Fallback if pyOpenSSL not available - basic info from hashlib + import hashlib + fp = hashlib.sha256(cert_der).hexdigest() + return { + 'fingerprint': fp[:16], + 'fingerprint_full': fp, + 'subject_cn': '(pyOpenSSL not installed)', + 'subject_o': '', + 'issuer_cn': '', + 'issuer_o': '', + 'serial': '', + 'not_before': '', + 'not_after': '', + } + except Exception as e: + return None + + +def get_mitm_certificate(proxy_ip, proxy_port, proto, torhost, target_host, target_port, timeout): + """Connect to target through proxy without cert verification to get MITM cert. + + Args: + proxy_ip: Proxy IP address + proxy_port: Proxy port + proto: Proxy protocol (http, socks4, socks5) + torhost: Tor SOCKS5 address + target_host: Target host for SSL connection + target_port: Target port (usually 443) + timeout: Connection timeout + + Returns: + dict with certificate info or None on failure + """ + try: + proxies = [ + rocksock.RocksockProxyFromURL('socks5://%s' % torhost), + rocksock.RocksockProxyFromURL('%s://%s:%s' % (proto, proxy_ip, proxy_port)), + ] + + # Connect without certificate verification + sock = rocksock.Rocksock(host=target_host, port=target_port, ssl=True, + proxies=proxies, timeout=timeout, verifycert=False) + sock.connect() + + # Get peer certificate + cert_der = sock.sock.getpeercert(binary_form=True) + sock.disconnect() + + if cert_der: + return extract_cert_info(cert_der) + return None + except Exception as e: + return None + + +# Global MITM cert stats instance +mitm_cert_stats = MITMCertStats() + + class PriorityJobQueue(object): """Priority queue for proxy test jobs. @@ -838,6 +1050,7 @@ class ProxyTestState(): self.lock = threading.Lock() self.results = [] # list of (success, proto, duration, srv, tor, ssl) self.completed = False + self.evaluated = False # for evaluate() idempotency self.last_latency_ms = None # average latency from successful tests self.exit_ip = None # IP seen by target server (for anonymity detection) self.reveals_headers = None # True if proxy adds X-Forwarded-For/Via headers @@ -908,9 +1121,10 @@ class ProxyTestState(): the dominant failure type (or None on success) """ with self.lock: - if self.completed: - return (self.failcount == 0, None) - self.completed = True + if self.evaluated: + # Already evaluated - return cached result + return (self.failcount == 0, self.last_fail_category) + self.evaluated = True self.checktime = int(time.time()) successes = [r for r in self.results if r['success']] @@ -990,20 +1204,11 @@ class ProxyTestState(): return (True, None) else: - # Check if failures were all judge blocks (not proxy's fault) - judge_blocks = [f for f in failures if f.get('category') == 'judge_block'] - real_failures = [f for f in failures if f.get('category') != 'judge_block'] - - if judge_blocks and not real_failures: - # All failures were judge blocks - inconclusive, don't penalize proxy - # checktime still updated so we don't immediately retest - return (False, 'judge_block') - else: - # Real proxy failure - self.failcount += 1 - self.consecutive_success = 0 - self.last_fail_category = fail_category - return (False, fail_category) + # Real proxy failure + self.failcount += 1 + self.consecutive_success = 0 + self.last_fail_category = fail_category + return (False, fail_category) class TargetTestJob(): @@ -1020,6 +1225,7 @@ class TargetTestJob(): def run(self): """Test the proxy against this job's target server.""" + network_stats.set_category('proxy') sock, proto, duration, tor, srv, failinc, is_ssl, err_cat = self._connect_and_test() if not sock: @@ -1039,7 +1245,7 @@ class TargetTestJob(): ) self.proxy_state.mitm = 1 else: - self.proxy_state.record_result(False, category=err_cat) + self.proxy_state.record_result(False, category=err_cat, ssl=is_ssl) return try: @@ -1081,10 +1287,17 @@ class TargetTestJob(): # Check if judge is blocking us (not a proxy failure) if self.checktype == 'judges' and JUDGE_BLOCK_RE.search(recv): judge_stats.record_block(srv) - # Don't count as proxy failure - judge is blocking - self.proxy_state.record_result(False, category='judge_block') + # Judge block = proxy worked, we got HTTP response, just no IP + # Count as success without exit_ip + block_elapsed = time.time() - duration + self.proxy_state.record_result( + True, proto=proto, duration=block_elapsed, + srv=srv, tor=tor, ssl=is_ssl, exit_ip=None, + reveals_headers=None + ) if config.watchd.debug: - _log('judge %s blocked proxy %s' % (srv, self.proxy_state.proxy), 'debug') + _log('judge %s challenged proxy %s (counted as success)' % ( + srv, self.proxy_state.proxy), 'debug') else: if self.checktype == 'judges': judge_stats.record_failure(srv) @@ -1138,6 +1351,7 @@ class TargetTestJob(): torhost = pool.get_tor_host(self.worker_id) else: torhost = random.choice(config.torhosts) + network_stats.set_tor_node(torhost) if proto == 'socks4': srv = socks4_resolve(connect_host, server_port) else: @@ -1151,9 +1365,13 @@ class TargetTestJob(): rocksock.RocksockProxyFromURL('%s://%s:%s' % (proto, ps.ip, ps.port)), ] + # Adaptive timeout: give proxies with failures slightly more time + # Linear increase capped at 5s extra (0 fails=base, 10 fails=+5s max) + adaptive_timeout = config.watchd.timeout + min(ps.failcount * 0.5, 5) + try: sock = rocksock.Rocksock(host=srv, port=server_port, ssl=use_ssl, - proxies=proxies, timeout=config.watchd.timeout, + proxies=proxies, timeout=adaptive_timeout, verifycert=verifycert) sock.connect() @@ -1212,6 +1430,22 @@ class TargetTestJob(): elapsed = time.time() - duration if pool: pool.record_success(torhost, elapsed) + # Extract MITM certificate info (async to not block) + try: + cert_info = get_mitm_certificate( + ps.ip, ps.port, proto, torhost, + connect_host, server_port, config.watchd.timeout + ) + if cert_info: + mitm_cert_stats.add_cert(ps.ip, cert_info) + if config.watchd.debug: + _log('MITM cert: %s (CN=%s, O=%s)' % ( + cert_info.get('fingerprint', ''), + cert_info.get('subject_cn', ''), + cert_info.get('subject_o', '')), 'debug') + except Exception as e: + if config.watchd.debug: + _log('failed to extract MITM cert: %s' % str(e), 'debug') return None, proto, duration, torhost, srvname, 0, use_ssl, 'ssl_mitm' except KeyboardInterrupt as e: @@ -1718,13 +1952,17 @@ class Proxywatchd(): with self.stats.lock: ssl_tested = self.stats.ssl_tested ssl_passed = self.stats.ssl_passed + mitm_stats = mitm_cert_stats.get_stats() stats_data['ssl'] = { 'tested': ssl_tested, 'passed': ssl_passed, 'failed': self.stats.ssl_failed, + 'fail_categories': dict(self.stats.ssl_fail_categories), 'success_rate': try_div(ssl_passed * 100.0, ssl_tested) if ssl_tested > 0 else 0, 'mitm_detected': self.stats.mitm_detected, - 'cert_errors': self.stats.cert_errors + 'cert_errors': self.stats.cert_errors, + 'mitm_unique_certs': mitm_stats.get('unique_certs', 0), + 'mitm_unique_proxies': mitm_stats.get('unique_proxies', 0) } # Thread info @@ -1733,6 +1971,7 @@ class Proxywatchd(): stats_data['max_threads'] = self.scaler.max_threads stats_data['queue_size'] = self.job_queue.qsize() stats_data['checktype'] = config.watchd.checktype + stats_data['use_ssl'] = config.watchd.use_ssl stats_data['profiling'] = getattr(config.args, 'profile', False) if hasattr(config, 'args') else False stats_data['pass_rate'] = try_div(self.stats.passed, elapsed) @@ -1804,6 +2043,12 @@ class Proxywatchd(): stats_data['engines_backoff'] = 0 stats_data['scraper'] = None + # Network usage stats + stats_data['network'] = network_stats.get_stats() + + # MITM certificate stats (detailed) + stats_data['mitm'] = mitm_cert_stats.get_stats() + return stats_data def _run(self):