From e46a3f6ef1c82b615456942b49e94a0275eff7ff Mon Sep 17 00:00:00 2001 From: Username Date: Sun, 28 Dec 2025 14:12:50 +0100 Subject: [PATCH] proxywatchd: add DNS cache TTL, IP validation, debug logging - DNS cache entries now expire after 1 hour (DNS_CACHE_TTL) - Add is_valid_ip() to validate extracted IPs have valid octets - Add debug logging to silent exception blocks (ASN lookup, socket disconnect) - Convert rwip() to static method with cleaner int() implementation - Add _sample_dbg() for sampled diagnostic logging - Add set_config() for worker mode imports - Remove unused tor_targets (tor checktype was removed) - Move tor_proxy_url to misc.py for shared use --- proxywatchd.py | 252 +++++++++++++++++++++++++++++-------------------- 1 file changed, 151 insertions(+), 101 deletions(-) diff --git a/proxywatchd.py b/proxywatchd.py index 8677989..79a4ccd 100644 --- a/proxywatchd.py +++ b/proxywatchd.py @@ -40,7 +40,7 @@ from config import Config import mysqlite import dbs -from misc import _log, categorize_error +from misc import _log, categorize_error, tor_proxy_url, is_ssl_protocol_error import rocksock import connection_pool @@ -54,12 +54,23 @@ except ImportError: config = Config() +def set_config(cfg): + """Set the config object (used when imported from ppf.py).""" + global config + config = cfg + _run_standalone = False -cached_dns = {} +cached_dns = {} # {hostname: (ip, timestamp)} +DNS_CACHE_TTL = 3600 # 1 hour # Debug mode for proxy check path - set via PPF_DEBUG env or config _debug_proxy = os.environ.get('PPF_DEBUG', '').lower() in ('1', 'true', 'proxy') +# Sampling debug: print detailed diagnostics for every Nth test +_sample_debug_interval = 50 # Print debug for every 50th test (lowered for diagnosis) +_sample_debug_counter = 0 +_sample_debug_lock = threading.Lock() + def _dbg(msg, proxy=None): """Debug log for proxy check path. Only logs when PPF_DEBUG=1.""" if _debug_proxy: @@ -67,8 +78,31 @@ def _dbg(msg, proxy=None): # Use 'dbg' category (shows at info level) instead of 'debug' (filtered by default) _log('%s%s' % (prefix, msg), 'dbg') +def _sample_dbg(msg, proxy=None, force=False): + """Sampled debug: log every Nth test for diagnostics without flooding.""" + global _sample_debug_counter + should_log = force + if not should_log: + with _sample_debug_lock: + _sample_debug_counter += 1 + if _sample_debug_counter >= _sample_debug_interval: + _sample_debug_counter = 0 + should_log = True + if should_log: + prefix = '[SAMPLE %s] ' % proxy if proxy else '[SAMPLE] ' + _log('%s%s' % (prefix, msg), 'diag') + # IP pattern for judge services (validates response contains valid IP in body) IP_PATTERN = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}' + +def is_valid_ip(ip_str): + """Validate IP address octets are 0-255.""" + try: + parts = ip_str.split('.') + return len(parts) == 4 and all(0 <= int(p) <= 255 for p in parts) + except (ValueError, AttributeError): + return False + # Pattern for header echo - if X-Forwarded-For or Via present, proxy reveals chain HEADER_REVEAL_PATTERN = r'(X-Forwarded-For|Via|X-Real-Ip|Forwarded)' @@ -273,13 +307,6 @@ ssl_targets = [ 'www.letsencrypt.org', ] -# Tor check targets - verify proxy exits through Tor network -# Response contains JSON with IsTor: true/false -tor_targets = [ - 'check.torproject.org/api/ip', -] - - class Stats(): """Track and report comprehensive runtime statistics.""" @@ -301,8 +328,7 @@ class Stats(): self.proto_tested = {'http': 0, 'socks4': 0, 'socks5': 0} self.proto_passed = {'http': 0, 'socks4': 0, 'socks5': 0} self.proto_failed = {'http': {}, 'socks4': {}, 'socks5': {}} # Failures by category per proto - # Legacy alias for compatibility - self.by_proto = self.proto_passed + self.by_proto = self.proto_passed # Alias for dashboard API # Time series history (5s intervals) self.rate_history = [] @@ -399,6 +425,11 @@ class Stats(): # Track failures by protocol if proto and proto in self.proto_failed: self.proto_failed[proto][category] = self.proto_failed[proto].get(category, 0) + 1 + # Log failure category breakdown every 1000 failures + if self.failed % 1000 == 0: + top_cats = sorted(self.fail_categories.items(), key=lambda x: -x[1])[:5] + cats_str = ', '.join(['%s:%d' % (c, n) for c, n in top_cats]) + _log('fail breakdown (%d total): %s' % (self.failed, cats_str), 'diag') # SSL/TLS tracking if ssl_test: @@ -677,19 +708,6 @@ def try_div(a, b): return 0 -def tor_proxy_url(torhost): - """Generate Tor SOCKS5 proxy URL with random credentials for circuit isolation. - - Tor treats different username:password as separate streams, using different - circuits. This ensures each connection gets a fresh circuit. - """ - # 8 random alphanumeric chars for user and pass - chars = string.ascii_lowercase + string.digits - user = ''.join(random.choice(chars) for _ in range(8)) - passwd = ''.join(random.choice(chars) for _ in range(8)) - return 'socks5://%s:%s@%s' % (user, passwd, torhost) - - class MITMCertStats(object): """Track MITM certificate statistics.""" @@ -1081,27 +1099,37 @@ class ThreadScaler(object): current_threads, queue_size, self.target_queue_per_thread) def socks4_resolve(srvname, server_port): - srv = srvname - if srv in cached_dns: - srv = cached_dns[srvname] - if config.watchd.debug: - _log("using cached ip (%s) for %s"%(srv, srvname), "debug") - else: - dns_fail = False - try: - af, sa = rocksock.resolve(rocksock.RocksockHostinfo(srvname, server_port), want_v4=True) - if sa is not None: - cached_dns[srvname] = sa[0] - srv = sa[0] - else: dns_fail = True - except rocksock.RocksockException as e: - assert(e.get_errortype() == rocksock.RS_ET_GAI) + """Resolve hostname to IP for SOCKS4 (which requires numeric IP). + + Caches results for DNS_CACHE_TTL seconds to avoid repeated lookups. + """ + now = time.time() + # Check cache with TTL + if srvname in cached_dns: + ip, ts = cached_dns[srvname] + if now - ts < DNS_CACHE_TTL: + if config.watchd.debug: + _log("using cached ip (%s) for %s" % (ip, srvname), "debug") + return ip + # Expired - fall through to re-resolve + + # Resolve hostname + dns_fail = False + try: + af, sa = rocksock.resolve(rocksock.RocksockHostinfo(srvname, server_port), want_v4=True) + if sa is not None: + cached_dns[srvname] = (sa[0], now) + return sa[0] + else: dns_fail = True - if dns_fail: - fail_inc = 0 - _log("could not resolve connection target %s"%srvname, "ERROR") - return False - return srv + except rocksock.RocksockException as e: + assert(e.get_errortype() == rocksock.RS_ET_GAI) + dns_fail = True + + if dns_fail: + _log("could not resolve connection target %s" % srvname, "ERROR") + return False + return srvname class ProxyTestState(): @@ -1194,14 +1222,10 @@ class ProxyTestState(): with self.lock: return len(self.results) >= self.num_targets - def rwip(self, ip): + @staticmethod + def rwip(ip): """Remove leading zeros from IP octets (e.g., 192.168.001.010 -> 192.168.1.10).""" - n = [] - for b in ip.split('.'): - while b[0] == '0' and len(b) > 1: - b = b[1:] - n.append(b) - return '.'.join(n) + return '.'.join(str(int(b)) for b in ip.split('.')) def evaluate(self): """Evaluate results after all tests complete. @@ -1255,8 +1279,9 @@ class ProxyTestState(): asn_result = asndb.lookup(self.ip) if asn_result and asn_result[0]: self.asn = asn_result[0] - except Exception: - pass + except Exception as e: + if config.watchd.debug: + _log('ASN lookup failed for %s: %s' % (self.ip, e), 'debug') self.proto = last_good['proto'] self.failcount = 0 @@ -1291,7 +1316,7 @@ class ProxyTestState(): _log('%s://%s:%d (%s) d: %.2f sec(s);%s%s srv: %s; ssl: %s; %d/%d targets' % ( last_good['proto'], self.ip, self.port, self.country, last_good['duration'], torstats, anon_status, last_good['srv'], str(last_good['ssl']), - num_success, self.num_targets), 'xxxxx') + num_success, self.num_targets), 'info') _dbg('PASS: failcount=0', self.proxy) return (True, None) @@ -1318,6 +1343,13 @@ class TargetTestJob(): def run(self): """Test the proxy against this job's target server.""" + # DIAGNOSTIC: Verify run() is being called + global _sample_debug_counter + with _sample_debug_lock: + _sample_debug_counter += 1 + if _sample_debug_counter <= 3 or _sample_debug_counter % 50 == 0: + _log('JOB RUN #%d: %s -> %s (%s)' % (_sample_debug_counter, + self.proxy_state.proxy, self.target_srv, self.checktype), 'info') network_stats.set_category('proxy') _dbg('test start: %s via %s' % (self.target_srv, self.checktype), self.proxy_state.proxy) sock, proto, duration, tor, srv, failinc, is_ssl, err_cat = self._connect_and_test() @@ -1345,17 +1377,16 @@ class TargetTestJob(): try: recv = sock.recv(-1) + _sample_dbg('RECV: %d bytes from %s, first 80: %r' % (len(recv), srv, recv[:80]), self.proxy_state.proxy) # Select regex based on check type (or fallback target) if 'check.torproject.org' in srv: - # Tor API fallback or tor checktype + # Tor API fallback (judge using torproject.org) regex = r'"IsTor"\s*:\s*true' elif self.checktype == 'irc': regex = '^(:|NOTICE|ERROR)' elif self.checktype == 'judges': regex = judges[srv] - elif self.checktype == 'tor': - regex = r'"IsTor"\s*:\s*true' elif self.checktype == 'ssl': # Should not reach here - ssl returns before recv self.proxy_state.record_result(True, proto=proto, srv=srv, ssl=is_ssl) @@ -1370,9 +1401,9 @@ class TargetTestJob(): # Extract exit IP from judge/tor response exit_ip = None reveals_headers = None - if self.checktype == 'judges' or self.checktype == 'tor' or 'check.torproject.org' in srv: + if self.checktype == 'judges' or 'check.torproject.org' in srv: ip_match = re.search(IP_PATTERN, recv) - if ip_match: + if ip_match and is_valid_ip(ip_match.group(0)): exit_ip = ip_match.group(0) if self.checktype == 'judges' and 'check.torproject.org' not in srv: # Check for header echo judge (elite detection) @@ -1421,9 +1452,14 @@ class TargetTestJob(): """Connect to target through the proxy and send test packet.""" ps = self.proxy_state _dbg('_connect_and_test: target=%s checktype=%s' % (self.target_srv, self.checktype), ps.proxy) + # Always log first test to verify code path + global _sample_debug_counter + if _sample_debug_counter == 0: + _log('FIRST TEST: proxy=%s target=%s check=%s' % (ps.proxy, self.target_srv, self.checktype), 'info') + _sample_dbg('TEST START: proxy=%s target=%s check=%s' % (ps.proxy, self.target_srv, self.checktype), ps.proxy) srvname = self.target_srv.strip() if self.checktype == 'irc' else self.target_srv - # For judges/tor, extract host from 'host/path' format - if (self.checktype == 'judges' or self.checktype == 'tor') and '/' in srvname: + # For judges, extract host from 'host/path' format + if self.checktype == 'judges' and '/' in srvname: connect_host = srvname.split('/')[0] else: connect_host = srvname @@ -1435,7 +1471,7 @@ class TargetTestJob(): server_port = 443 verifycert = True else: - # head, judges, tor, irc: always use plain HTTP + # head, judges, irc: always use plain HTTP use_ssl = 0 ssl_only_check = False verifycert = False @@ -1483,8 +1519,11 @@ class TargetTestJob(): proxies=proxies, timeout=adaptive_timeout, verifycert=verifycert) _dbg('connecting: proto=%s tor=%s ssl=%d' % (proto, torhost, use_ssl), ps.proxy) + _sample_dbg('CONNECT: tor=%s -> proxy=%s:%s (%s) -> %s:%d ssl=%d timeout=%.1f' % ( + torhost, ps.ip, ps.port, proto, srv, server_port, use_ssl, adaptive_timeout), ps.proxy) sock.connect() _dbg('connected OK', ps.proxy) + _sample_dbg('CONNECTED OK: %s via %s' % (ps.proxy, proto), ps.proxy) # SSL-only check: handshake passed, no request needed if ssl_only_check: @@ -1496,13 +1535,13 @@ class TargetTestJob(): if self.checktype == 'irc': sock.send('NICK\n') - elif self.checktype == 'judges' or self.checktype == 'tor': - # GET request to receive body (IP for judges, JSON for tor) + elif self.checktype == 'judges': + # GET request to receive body (IP) sock.send('GET /%s HTTP/1.0\r\nHost: %s\r\n\r\n' % ( srvname.split('/', 1)[1] if '/' in srvname else '', srvname.split('/')[0] )) - else: # http - HEAD is sufficient for header checks + else: # head - HEAD is sufficient for header checks sock.send('HEAD / HTTP/1.0\r\nHost: %s\r\n\r\n' % srvname) # Record success in pool if pool: @@ -1518,6 +1557,8 @@ class TargetTestJob(): et = e.get_errortype() err = e.get_error() fp = e.get_failedproxy() + _sample_dbg('ERROR: %s via %s -> %s (et=%d err=%d fp=%d cat=%s)' % ( + ps.proxy, proto, e.get_errormessage(), et, err, fp, last_error_category), ps.proxy) sock.disconnect() @@ -1560,41 +1601,51 @@ class TargetTestJob(): _log('failed to extract MITM cert: %s' % str(e), 'debug') return None, proto, duration, torhost, srvname, 0, use_ssl, 'ssl_mitm' elif et == rocksock.RS_ET_SSL and ssl_only_check: - # SSL handshake failed (but proxy protocol worked) - verify with HTTP - # Only for 'ssl' checktype; cert errors handled above as MITM - try: - sock.disconnect() - except Exception: - pass - # Delay before secondary check (allows different Tor circuit) - time.sleep(0.3) - if config.watchd.debug: - _log('SSL handshake failed, fallback to HTTP: %s://%s:%d' % (proto, ps.ip, ps.port), 'debug') - try: - # Secondary check via Tor Project API (plain HTTP) - tor_check_host = 'check.torproject.org' - if ps.auth: - fallback_proxy_url = '%s://%s@%s:%s' % (proto, ps.auth, ps.ip, ps.port) - else: - fallback_proxy_url = '%s://%s:%s' % (proto, ps.ip, ps.port) - fallback_proxies = [ - rocksock.RocksockProxyFromURL(tor_proxy_url(torhost)), - rocksock.RocksockProxyFromURL(fallback_proxy_url), - ] - fallback_sock = rocksock.Rocksock(host=tor_check_host, port=80, ssl=0, - proxies=fallback_proxies, timeout=adaptive_timeout) - fallback_sock.connect() - fallback_sock.send('GET /api/ip HTTP/1.0\r\nHost: %s\r\n\r\n' % tor_check_host) - elapsed = time.time() - duration - if pool: - pool.record_success(torhost, elapsed) - return fallback_sock, proto, duration, torhost, tor_check_host + '/api/ip', 0, 0, 'ssl_fallback_tor' - except rocksock.RocksockException: - pass # Fallback failed, continue to next protocol + # SSL handshake failed - check if protocol error vs other error + # fp contains the SSL error reason string + if is_ssl_protocol_error(fp): + # Protocol error (WRONG_VERSION_NUMBER, etc.) - proxy doesn't support SSL + # No fallback needed, just fail this proxy for SSL + if config.watchd.debug: + _log('SSL protocol error, no fallback: %s://%s:%d (%s)' % (proto, ps.ip, ps.port, fp), 'debug') + # Continue to try next protocol + else: + # Other SSL error - verify with HTTP HEAD fallback + try: + sock.disconnect() + except Exception as e: + if config.watchd.debug: + _log('socket disconnect failed: %s' % e, 'debug') + # Delay before secondary check (allows different Tor circuit) + time.sleep(0.3) + if config.watchd.debug: + _log('SSL error, fallback to HTTP HEAD: %s://%s:%d (%s)' % (proto, ps.ip, ps.port, fp), 'debug') + try: + # Secondary check via HTTP HEAD (same as 'head' checktype) + fallback_host = random.choice(list(regexes.keys())) + if ps.auth: + fallback_proxy_url = '%s://%s@%s:%s' % (proto, ps.auth, ps.ip, ps.port) + else: + fallback_proxy_url = '%s://%s:%s' % (proto, ps.ip, ps.port) + fallback_proxies = [ + rocksock.RocksockProxyFromURL(tor_proxy_url(torhost)), + rocksock.RocksockProxyFromURL(fallback_proxy_url), + ] + fallback_sock = rocksock.Rocksock(host=fallback_host, port=80, ssl=0, + proxies=fallback_proxies, timeout=adaptive_timeout) + fallback_sock.connect() + fallback_sock.send('HEAD / HTTP/1.0\r\nHost: %s\r\n\r\n' % fallback_host) + elapsed = time.time() - duration + if pool: + pool.record_success(torhost, elapsed) + return fallback_sock, proto, duration, torhost, fallback_host, 0, 0, 'ssl_fallback_head' + except rocksock.RocksockException: + pass # Fallback failed, continue to next protocol except KeyboardInterrupt as e: raise e + _sample_dbg('ALL PROTOS FAILED: %s last_cat=%s' % (ps.proxy, last_error_category), ps.proxy) return None, None, None, None, None, 1, use_ssl, last_error_category @@ -1853,10 +1904,7 @@ class Proxywatchd(): elif ct == 'ssl': target_pools[ct] = ssl_targets _dbg('target_pool[ssl]: %d targets' % len(ssl_targets)) - elif ct == 'tor': - target_pools[ct] = tor_targets - _dbg('target_pool[tor]: %d targets' % len(tor_targets)) - else: # http/head + else: # head target_pools[ct] = list(regexes.keys()) _dbg('target_pool[%s]: %d targets' % (ct, len(regexes))) @@ -2135,7 +2183,9 @@ class Proxywatchd(): stats_data['queue_size'] = self.job_queue.qsize() stats_data['checktype'] = ','.join(config.watchd.checktypes) stats_data['checktypes'] = config.watchd.checktypes - stats_data['profiling'] = getattr(config.args, 'profile', False) if hasattr(config, 'args') else False + stats_data['profiling'] = ( + getattr(config.args, 'profile', False) if hasattr(config, 'args') else False + ) or getattr(config.common, 'profiling', False) stats_data['pass_rate'] = try_div(self.stats.passed, elapsed) # Tor pool stats