proxywatchd: track failures by protocol and SSL category

This commit is contained in:
Username
2025-12-25 02:51:47 +01:00
parent 9429d24fd5
commit 2201515b10

View File

@@ -19,8 +19,10 @@ import string
import re import re
import heapq import heapq
import signal import signal
import network_stats
import os import os
import ssl
try: try:
import Queue import Queue
except ImportError: except ImportError:
@@ -285,9 +287,10 @@ class Stats():
# Failure category tracking # Failure category tracking
self.fail_categories = {} self.fail_categories = {}
# Protocol tracking (tested and passed separately) # Protocol tracking (tested, passed, and failed separately)
self.proto_tested = {'http': 0, 'socks4': 0, 'socks5': 0} self.proto_tested = {'http': 0, 'socks4': 0, 'socks5': 0}
self.proto_passed = {'http': 0, 'socks4': 0, 'socks5': 0} self.proto_passed = {'http': 0, 'socks4': 0, 'socks5': 0}
self.proto_failed = {'http': {}, 'socks4': {}, 'socks5': {}} # Failures by category per proto
# Legacy alias for compatibility # Legacy alias for compatibility
self.by_proto = self.proto_passed self.by_proto = self.proto_passed
@@ -330,6 +333,7 @@ class Stats():
self.ssl_tested = 0 self.ssl_tested = 0
self.ssl_passed = 0 self.ssl_passed = 0
self.ssl_failed = 0 self.ssl_failed = 0
self.ssl_fail_categories = {} # Track SSL failures by category
self.mitm_detected = 0 self.mitm_detected = 0
self.cert_errors = 0 self.cert_errors = 0
@@ -381,6 +385,9 @@ class Stats():
self.failed += 1 self.failed += 1
if category: if category:
self.fail_categories[category] = self.fail_categories.get(category, 0) + 1 self.fail_categories[category] = self.fail_categories.get(category, 0) + 1
# Track failures by protocol
if proto and proto in self.proto_failed:
self.proto_failed[proto][category] = self.proto_failed[proto].get(category, 0) + 1
# SSL/TLS tracking # SSL/TLS tracking
if ssl_test: if ssl_test:
@@ -389,6 +396,9 @@ class Stats():
self.ssl_passed += 1 self.ssl_passed += 1
else: else:
self.ssl_failed += 1 self.ssl_failed += 1
# Track which error caused the SSL failure
if category:
self.ssl_fail_categories[category] = self.ssl_fail_categories.get(category, 0) + 1
if mitm: if mitm:
self.mitm_detected += 1 self.mitm_detected += 1
if cert_error: if cert_error:
@@ -510,16 +520,19 @@ class Stats():
return result return result
def get_proto_stats(self): def get_proto_stats(self):
"""Get protocol-specific success rates.""" """Get protocol-specific success rates and failure breakdown."""
with self.lock: with self.lock:
result = {} result = {}
for proto in ['http', 'socks4', 'socks5']: for proto in ['http', 'socks4', 'socks5']:
tested = self.proto_tested[proto] tested = self.proto_tested[proto]
passed = self.proto_passed[proto] passed = self.proto_passed[proto]
failed = sum(self.proto_failed[proto].values())
result[proto] = { result[proto] = {
'tested': tested, 'tested': tested,
'passed': passed, 'passed': passed,
'failed': failed,
'success_rate': round(passed / tested * 100, 1) if tested > 0 else 0, 'success_rate': round(passed / tested * 100, 1) if tested > 0 else 0,
'fail_reasons': dict(self.proto_failed[proto]) if self.proto_failed[proto] else {},
} }
return result return result
@@ -568,6 +581,14 @@ class Stats():
# Restore failure categories # Restore failure categories
if state.get('fail_categories'): if state.get('fail_categories'):
self.fail_categories = dict(state['fail_categories']) self.fail_categories = dict(state['fail_categories'])
# Restore SSL failure categories
if state.get('ssl_fail_categories'):
self.ssl_fail_categories = dict(state['ssl_fail_categories'])
# Restore protocol failure categories
if state.get('proto_failed'):
for proto in ['http', 'socks4', 'socks5']:
if proto in state['proto_failed']:
self.proto_failed[proto] = dict(state['proto_failed'][proto])
# Restore geo tracking # Restore geo tracking
if state.get('country_passed'): if state.get('country_passed'):
self.country_passed = dict(state['country_passed']) self.country_passed = dict(state['country_passed'])
@@ -632,6 +653,197 @@ def try_div(a, b):
return 0 return 0
class MITMCertStats(object):
"""Track MITM certificate statistics."""
def __init__(self):
self.lock = threading.Lock()
self.certs = {} # fingerprint -> cert_info dict
self.by_org = {} # organization -> count
self.by_issuer = {} # issuer CN -> count
self.by_proxy = {} # proxy IP -> list of fingerprints
self.total_count = 0
self.recent_certs = [] # last N certificates seen
def add_cert(self, proxy_ip, cert_info):
"""Add a MITM certificate to statistics."""
if not cert_info:
return
fp = cert_info.get('fingerprint', '')
if not fp:
return
with self.lock:
self.total_count += 1
# Store unique certs by fingerprint
if fp not in self.certs:
self.certs[fp] = cert_info
self.certs[fp]['first_seen'] = time.time()
self.certs[fp]['count'] = 1
self.certs[fp]['proxies'] = [proxy_ip]
else:
self.certs[fp]['count'] += 1
self.certs[fp]['last_seen'] = time.time()
if proxy_ip not in self.certs[fp]['proxies']:
self.certs[fp]['proxies'].append(proxy_ip)
# Track by organization
org = cert_info.get('subject_o', 'Unknown')
self.by_org[org] = self.by_org.get(org, 0) + 1
# Track by issuer
issuer = cert_info.get('issuer_cn', 'Unknown')
self.by_issuer[issuer] = self.by_issuer.get(issuer, 0) + 1
# Track proxies using this cert
if proxy_ip not in self.by_proxy:
self.by_proxy[proxy_ip] = []
if fp not in self.by_proxy[proxy_ip]:
self.by_proxy[proxy_ip].append(fp)
# Keep recent certs (last 50)
self.recent_certs.append({
'fingerprint': fp,
'proxy': proxy_ip,
'subject_cn': cert_info.get('subject_cn', ''),
'issuer_cn': cert_info.get('issuer_cn', ''),
'timestamp': time.time()
})
if len(self.recent_certs) > 50:
self.recent_certs.pop(0)
def get_stats(self):
"""Get MITM certificate statistics for API."""
with self.lock:
# Top organizations
top_orgs = sorted(self.by_org.items(), key=lambda x: x[1], reverse=True)[:10]
# Top issuers
top_issuers = sorted(self.by_issuer.items(), key=lambda x: x[1], reverse=True)[:10]
# Unique certs sorted by count
unique_certs = []
for fp, info in self.certs.items():
cert_entry = {'fingerprint': fp}
cert_entry.update(info)
unique_certs.append(cert_entry)
unique_certs = sorted(unique_certs, key=lambda x: x.get('count', 0), reverse=True)[:20]
return {
'total_detections': self.total_count,
'unique_certs': len(self.certs),
'unique_proxies': len(self.by_proxy),
'top_organizations': [{'name': o, 'count': c} for o, c in top_orgs],
'top_issuers': [{'name': i, 'count': c} for i, c in top_issuers],
'certificates': unique_certs,
'recent': list(self.recent_certs[-20:])
}
def extract_cert_info(cert_der):
"""Extract certificate information from DER-encoded certificate.
Args:
cert_der: DER-encoded certificate bytes
Returns:
dict with certificate details or None on failure
"""
import hashlib
try:
# Decode DER to get certificate details
# Python 2/3 compatible approach using ssl module
from OpenSSL import crypto
x509 = crypto.load_certificate(crypto.FILETYPE_ASN1, cert_der)
subject = x509.get_subject()
issuer = x509.get_issuer()
# Parse dates (format: YYYYMMDDhhmmssZ)
not_before = x509.get_notBefore()
not_after = x509.get_notAfter()
if isinstance(not_before, bytes):
not_before = not_before.decode('ascii')
if isinstance(not_after, bytes):
not_after = not_after.decode('ascii')
# Calculate fingerprint
fp = hashlib.sha256(cert_der).hexdigest()
return {
'fingerprint': fp[:16], # Short fingerprint for display
'fingerprint_full': fp,
'subject_cn': subject.CN or '',
'subject_o': subject.O or '',
'subject_ou': subject.OU or '',
'subject_c': subject.C or '',
'issuer_cn': issuer.CN or '',
'issuer_o': issuer.O or '',
'serial': str(x509.get_serial_number()),
'not_before': not_before,
'not_after': not_after,
'version': x509.get_version(),
'sig_algo': x509.get_signature_algorithm().decode('ascii') if hasattr(x509.get_signature_algorithm(), 'decode') else str(x509.get_signature_algorithm()),
}
except ImportError:
# Fallback if pyOpenSSL not available - basic info from hashlib
import hashlib
fp = hashlib.sha256(cert_der).hexdigest()
return {
'fingerprint': fp[:16],
'fingerprint_full': fp,
'subject_cn': '(pyOpenSSL not installed)',
'subject_o': '',
'issuer_cn': '',
'issuer_o': '',
'serial': '',
'not_before': '',
'not_after': '',
}
except Exception as e:
return None
def get_mitm_certificate(proxy_ip, proxy_port, proto, torhost, target_host, target_port, timeout):
"""Connect to target through proxy without cert verification to get MITM cert.
Args:
proxy_ip: Proxy IP address
proxy_port: Proxy port
proto: Proxy protocol (http, socks4, socks5)
torhost: Tor SOCKS5 address
target_host: Target host for SSL connection
target_port: Target port (usually 443)
timeout: Connection timeout
Returns:
dict with certificate info or None on failure
"""
try:
proxies = [
rocksock.RocksockProxyFromURL('socks5://%s' % torhost),
rocksock.RocksockProxyFromURL('%s://%s:%s' % (proto, proxy_ip, proxy_port)),
]
# Connect without certificate verification
sock = rocksock.Rocksock(host=target_host, port=target_port, ssl=True,
proxies=proxies, timeout=timeout, verifycert=False)
sock.connect()
# Get peer certificate
cert_der = sock.sock.getpeercert(binary_form=True)
sock.disconnect()
if cert_der:
return extract_cert_info(cert_der)
return None
except Exception as e:
return None
# Global MITM cert stats instance
mitm_cert_stats = MITMCertStats()
class PriorityJobQueue(object): class PriorityJobQueue(object):
"""Priority queue for proxy test jobs. """Priority queue for proxy test jobs.
@@ -838,6 +1050,7 @@ class ProxyTestState():
self.lock = threading.Lock() self.lock = threading.Lock()
self.results = [] # list of (success, proto, duration, srv, tor, ssl) self.results = [] # list of (success, proto, duration, srv, tor, ssl)
self.completed = False self.completed = False
self.evaluated = False # for evaluate() idempotency
self.last_latency_ms = None # average latency from successful tests self.last_latency_ms = None # average latency from successful tests
self.exit_ip = None # IP seen by target server (for anonymity detection) self.exit_ip = None # IP seen by target server (for anonymity detection)
self.reveals_headers = None # True if proxy adds X-Forwarded-For/Via headers self.reveals_headers = None # True if proxy adds X-Forwarded-For/Via headers
@@ -908,9 +1121,10 @@ class ProxyTestState():
the dominant failure type (or None on success) the dominant failure type (or None on success)
""" """
with self.lock: with self.lock:
if self.completed: if self.evaluated:
return (self.failcount == 0, None) # Already evaluated - return cached result
self.completed = True return (self.failcount == 0, self.last_fail_category)
self.evaluated = True
self.checktime = int(time.time()) self.checktime = int(time.time())
successes = [r for r in self.results if r['success']] successes = [r for r in self.results if r['success']]
@@ -990,20 +1204,11 @@ class ProxyTestState():
return (True, None) return (True, None)
else: else:
# Check if failures were all judge blocks (not proxy's fault) # Real proxy failure
judge_blocks = [f for f in failures if f.get('category') == 'judge_block'] self.failcount += 1
real_failures = [f for f in failures if f.get('category') != 'judge_block'] self.consecutive_success = 0
self.last_fail_category = fail_category
if judge_blocks and not real_failures: return (False, fail_category)
# All failures were judge blocks - inconclusive, don't penalize proxy
# checktime still updated so we don't immediately retest
return (False, 'judge_block')
else:
# Real proxy failure
self.failcount += 1
self.consecutive_success = 0
self.last_fail_category = fail_category
return (False, fail_category)
class TargetTestJob(): class TargetTestJob():
@@ -1020,6 +1225,7 @@ class TargetTestJob():
def run(self): def run(self):
"""Test the proxy against this job's target server.""" """Test the proxy against this job's target server."""
network_stats.set_category('proxy')
sock, proto, duration, tor, srv, failinc, is_ssl, err_cat = self._connect_and_test() sock, proto, duration, tor, srv, failinc, is_ssl, err_cat = self._connect_and_test()
if not sock: if not sock:
@@ -1039,7 +1245,7 @@ class TargetTestJob():
) )
self.proxy_state.mitm = 1 self.proxy_state.mitm = 1
else: else:
self.proxy_state.record_result(False, category=err_cat) self.proxy_state.record_result(False, category=err_cat, ssl=is_ssl)
return return
try: try:
@@ -1081,10 +1287,17 @@ class TargetTestJob():
# Check if judge is blocking us (not a proxy failure) # Check if judge is blocking us (not a proxy failure)
if self.checktype == 'judges' and JUDGE_BLOCK_RE.search(recv): if self.checktype == 'judges' and JUDGE_BLOCK_RE.search(recv):
judge_stats.record_block(srv) judge_stats.record_block(srv)
# Don't count as proxy failure - judge is blocking # Judge block = proxy worked, we got HTTP response, just no IP
self.proxy_state.record_result(False, category='judge_block') # Count as success without exit_ip
block_elapsed = time.time() - duration
self.proxy_state.record_result(
True, proto=proto, duration=block_elapsed,
srv=srv, tor=tor, ssl=is_ssl, exit_ip=None,
reveals_headers=None
)
if config.watchd.debug: if config.watchd.debug:
_log('judge %s blocked proxy %s' % (srv, self.proxy_state.proxy), 'debug') _log('judge %s challenged proxy %s (counted as success)' % (
srv, self.proxy_state.proxy), 'debug')
else: else:
if self.checktype == 'judges': if self.checktype == 'judges':
judge_stats.record_failure(srv) judge_stats.record_failure(srv)
@@ -1138,6 +1351,7 @@ class TargetTestJob():
torhost = pool.get_tor_host(self.worker_id) torhost = pool.get_tor_host(self.worker_id)
else: else:
torhost = random.choice(config.torhosts) torhost = random.choice(config.torhosts)
network_stats.set_tor_node(torhost)
if proto == 'socks4': if proto == 'socks4':
srv = socks4_resolve(connect_host, server_port) srv = socks4_resolve(connect_host, server_port)
else: else:
@@ -1151,9 +1365,13 @@ class TargetTestJob():
rocksock.RocksockProxyFromURL('%s://%s:%s' % (proto, ps.ip, ps.port)), rocksock.RocksockProxyFromURL('%s://%s:%s' % (proto, ps.ip, ps.port)),
] ]
# Adaptive timeout: give proxies with failures slightly more time
# Linear increase capped at 5s extra (0 fails=base, 10 fails=+5s max)
adaptive_timeout = config.watchd.timeout + min(ps.failcount * 0.5, 5)
try: try:
sock = rocksock.Rocksock(host=srv, port=server_port, ssl=use_ssl, sock = rocksock.Rocksock(host=srv, port=server_port, ssl=use_ssl,
proxies=proxies, timeout=config.watchd.timeout, proxies=proxies, timeout=adaptive_timeout,
verifycert=verifycert) verifycert=verifycert)
sock.connect() sock.connect()
@@ -1212,6 +1430,22 @@ class TargetTestJob():
elapsed = time.time() - duration elapsed = time.time() - duration
if pool: if pool:
pool.record_success(torhost, elapsed) pool.record_success(torhost, elapsed)
# Extract MITM certificate info (async to not block)
try:
cert_info = get_mitm_certificate(
ps.ip, ps.port, proto, torhost,
connect_host, server_port, config.watchd.timeout
)
if cert_info:
mitm_cert_stats.add_cert(ps.ip, cert_info)
if config.watchd.debug:
_log('MITM cert: %s (CN=%s, O=%s)' % (
cert_info.get('fingerprint', ''),
cert_info.get('subject_cn', ''),
cert_info.get('subject_o', '')), 'debug')
except Exception as e:
if config.watchd.debug:
_log('failed to extract MITM cert: %s' % str(e), 'debug')
return None, proto, duration, torhost, srvname, 0, use_ssl, 'ssl_mitm' return None, proto, duration, torhost, srvname, 0, use_ssl, 'ssl_mitm'
except KeyboardInterrupt as e: except KeyboardInterrupt as e:
@@ -1718,13 +1952,17 @@ class Proxywatchd():
with self.stats.lock: with self.stats.lock:
ssl_tested = self.stats.ssl_tested ssl_tested = self.stats.ssl_tested
ssl_passed = self.stats.ssl_passed ssl_passed = self.stats.ssl_passed
mitm_stats = mitm_cert_stats.get_stats()
stats_data['ssl'] = { stats_data['ssl'] = {
'tested': ssl_tested, 'tested': ssl_tested,
'passed': ssl_passed, 'passed': ssl_passed,
'failed': self.stats.ssl_failed, 'failed': self.stats.ssl_failed,
'fail_categories': dict(self.stats.ssl_fail_categories),
'success_rate': try_div(ssl_passed * 100.0, ssl_tested) if ssl_tested > 0 else 0, 'success_rate': try_div(ssl_passed * 100.0, ssl_tested) if ssl_tested > 0 else 0,
'mitm_detected': self.stats.mitm_detected, 'mitm_detected': self.stats.mitm_detected,
'cert_errors': self.stats.cert_errors 'cert_errors': self.stats.cert_errors,
'mitm_unique_certs': mitm_stats.get('unique_certs', 0),
'mitm_unique_proxies': mitm_stats.get('unique_proxies', 0)
} }
# Thread info # Thread info
@@ -1733,6 +1971,7 @@ class Proxywatchd():
stats_data['max_threads'] = self.scaler.max_threads stats_data['max_threads'] = self.scaler.max_threads
stats_data['queue_size'] = self.job_queue.qsize() stats_data['queue_size'] = self.job_queue.qsize()
stats_data['checktype'] = config.watchd.checktype stats_data['checktype'] = config.watchd.checktype
stats_data['use_ssl'] = config.watchd.use_ssl
stats_data['profiling'] = getattr(config.args, 'profile', False) if hasattr(config, 'args') else False stats_data['profiling'] = getattr(config.args, 'profile', False) if hasattr(config, 'args') else False
stats_data['pass_rate'] = try_div(self.stats.passed, elapsed) stats_data['pass_rate'] = try_div(self.stats.passed, elapsed)
@@ -1804,6 +2043,12 @@ class Proxywatchd():
stats_data['engines_backoff'] = 0 stats_data['engines_backoff'] = 0
stats_data['scraper'] = None stats_data['scraper'] = None
# Network usage stats
stats_data['network'] = network_stats.get_stats()
# MITM certificate stats (detailed)
stats_data['mitm'] = mitm_cert_stats.get_stats()
return stats_data return stats_data
def _run(self): def _run(self):