diff --git a/config.ini.sample b/config.ini.sample index 919a0ad..654709f 100644 --- a/config.ini.sample +++ b/config.ini.sample @@ -41,6 +41,10 @@ min_threads = 5 # Timeout for proxy test connections (seconds) timeout = 15 +# Adaptive timeout: extra time per failure (seconds) +timeout_fail_inc = 1.5 +timeout_fail_max = 15 + # Number of failures before proxy marked dead max_fail = 5 diff --git a/config.py b/config.py index e571503..1d35541 100644 --- a/config.py +++ b/config.py @@ -97,6 +97,8 @@ class Config(ComboParser): self.add_item(section, 'threads', int, 10, 'number of threads watchd uses to check proxies', True) self.add_item(section, 'min_threads', int, 0, 'minimum threads (0 = auto: threads/4)', False) self.add_item(section, 'timeout', int, 15, 'timeout for blocking operations (connect/recv/...) for proxy checks in seconds', False) + self.add_item(section, 'timeout_fail_inc', float, 1.5, 'extra timeout per failure (default: 1.5)', False) + self.add_item(section, 'timeout_fail_max', float, 15, 'max extra timeout for failures (default: 15)', False) self.add_item(section, 'submit_after', int, 200, 'min. number of tested proxies for DB write', False) self.add_item(section, 'debug', bool, False, 'whether to print additional debug info', False) self.add_item(section, 'use_ssl', int, 1, 'whether to use SSL (1=always, 0=never, 2=random)', False) diff --git a/proxywatchd.py b/proxywatchd.py index 043d06f..6d66d80 100644 --- a/proxywatchd.py +++ b/proxywatchd.py @@ -57,6 +57,16 @@ config = Config() _run_standalone = False cached_dns = {} +# Debug mode for proxy check path - set via PPF_DEBUG env or config +_debug_proxy = os.environ.get('PPF_DEBUG', '').lower() in ('1', 'true', 'proxy') + +def _dbg(msg, proxy=None): + """Debug log for proxy check path. Only logs when PPF_DEBUG=1.""" + if _debug_proxy: + prefix = '[%s] ' % proxy if proxy else '' + # Use 'dbg' category (shows at info level) instead of 'debug' (filtered by default) + _log('%s%s' % (prefix, msg), 'dbg') + # IP pattern for judge services (validates response contains valid IP in body) IP_PATTERN = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}' # Pattern for header echo - if X-Forwarded-For or Via present, proxy reveals chain @@ -1139,6 +1149,7 @@ class ProxyTestState(): When all target tests complete, signals via completion_queue. """ + _dbg('record: success=%s proto=%s srv=%s cat=%s' % (success, proto, srv, category), self.proxy) should_signal = False with self.lock: self.results.append({ @@ -1203,6 +1214,7 @@ class ProxyTestState(): successes = [r for r in self.results if r['success']] failures = [r for r in self.results if not r['success']] num_success = len(successes) + _dbg('evaluate: %d success, %d fail, results=%d' % (num_success, len(failures), len(self.results)), self.proxy) # Determine dominant failure category fail_category = None @@ -1274,6 +1286,7 @@ class ProxyTestState(): last_good['proto'], self.ip, self.port, self.country, last_good['duration'], torstats, anon_status, last_good['srv'], str(last_good['ssl']), num_success, self.num_targets), 'xxxxx') + _dbg('PASS: failcount=0', self.proxy) return (True, None) else: @@ -1281,6 +1294,7 @@ class ProxyTestState(): self.failcount += 1 self.consecutive_success = 0 self.last_fail_category = fail_category + _dbg('FAIL: failcount=%d cat=%s' % (self.failcount, fail_category), self.proxy) return (False, fail_category) @@ -1299,7 +1313,9 @@ class TargetTestJob(): def run(self): """Test the proxy against this job's target server.""" network_stats.set_category('proxy') + _dbg('test start: %s via %s' % (self.target_srv, self.checktype), self.proxy_state.proxy) sock, proto, duration, tor, srv, failinc, is_ssl, err_cat = self._connect_and_test() + _dbg('connect result: sock=%s proto=%s err=%s' % (bool(sock), proto, err_cat), self.proxy_state.proxy) if not sock: # SSL-only check passed (handshake success, no request needed) @@ -1336,8 +1352,10 @@ class TargetTestJob(): else: # http regex = regexes[srv] + _dbg('recv %d bytes, regex=%s' % (len(recv), regex[:30]), self.proxy_state.proxy) if re.search(regex, recv, re.IGNORECASE): elapsed = time.time() - duration + _dbg('regex MATCH, elapsed=%.2fs' % elapsed, self.proxy_state.proxy) # Extract exit IP from judge response exit_ip = None reveals_headers = None @@ -1357,12 +1375,14 @@ class TargetTestJob(): reveals_headers=reveals_headers ) else: + _dbg('regex NO MATCH, recv[:100]=%r' % recv[:100], self.proxy_state.proxy) # Check if judge is blocking us (not a proxy failure) if self.checktype == 'judges' and JUDGE_BLOCK_RE.search(recv): judge_stats.record_block(srv) # Judge block = proxy worked, we got HTTP response, just no IP # Count as success without exit_ip block_elapsed = time.time() - duration + _dbg('judge BLOCK detected, counting as success', self.proxy_state.proxy) self.proxy_state.record_result( True, proto=proto, duration=block_elapsed, srv=srv, tor=tor, ssl=is_ssl, exit_ip=None, @@ -1372,6 +1392,7 @@ class TargetTestJob(): _log('judge %s challenged proxy %s (counted as success)' % ( srv, self.proxy_state.proxy), 'debug') else: + _dbg('FAIL: no match, no block', self.proxy_state.proxy) if self.checktype == 'judges': judge_stats.record_failure(srv) self.proxy_state.record_result(False, category='other') @@ -1387,6 +1408,7 @@ class TargetTestJob(): def _connect_and_test(self): """Connect to target through the proxy and send test packet.""" ps = self.proxy_state + _dbg('_connect_and_test: target=%s checktype=%s' % (self.target_srv, self.checktype), ps.proxy) srvname = self.target_srv.strip() if self.checktype == 'irc' else self.target_srv # For judges, extract host from 'host/path' format if self.checktype == 'judges' and '/' in srvname: @@ -1443,15 +1465,18 @@ class TargetTestJob(): rocksock.RocksockProxyFromURL(proxy_url), ] - # Adaptive timeout: give proxies with failures slightly more time - # Linear increase capped at 5s extra (0 fails=base, 10 fails=+5s max) - adaptive_timeout = config.watchd.timeout + min(ps.failcount * 0.5, 5) + # Adaptive timeout: give proxies with failures more time + adaptive_timeout = config.watchd.timeout + min( + ps.failcount * config.watchd.timeout_fail_inc, + config.watchd.timeout_fail_max) try: sock = rocksock.Rocksock(host=srv, port=server_port, ssl=use_ssl, proxies=proxies, timeout=adaptive_timeout, verifycert=verifycert) + _dbg('connecting: proto=%s tor=%s ssl=%d' % (proto, torhost, use_ssl), ps.proxy) sock.connect() + _dbg('connected OK', ps.proxy) # SSL-only check: handshake passed, no request needed if ssl_only_check: @@ -1783,15 +1808,12 @@ class Proxywatchd(): q = 'SELECT ip,port,proto,failed,success_count,total_duration,country,mitm,consecutive_success,asn,proxy FROM proxylist WHERE failed >= ? and failed < ? and (tested + ? + (failed * ?)) < ? ORDER BY RANDOM()' now = time.time() params = (0, config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now) - if config.watchd.debug: - _log('fetch_rows: db=%s checktime=%d perfail=%d max_fail=%d now=%d' % ( - config.watchd.database, config.watchd.checktime, config.watchd.perfail_checktime, - config.watchd.max_fail, int(now)), 'debug') + _dbg('fetch_rows: params=(0, %d, %d, %d, %.0f)' % (config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now)) rows = self.mysqlite.execute(q, params).fetchall() - if config.watchd.debug: - _log('fetch_rows: got %d rows' % len(rows), 'debug') + _dbg('fetch_rows: got %d rows, threads=%d' % (len(rows), config.watchd.threads)) # check oldies ? if len(rows) < config.watchd.threads: + _dbg('fetch_rows: rows < threads, clearing rows (oldies=%s)' % config.watchd.oldies) rows = [] if config.watchd.oldies: self.isoldies = True @@ -1805,6 +1827,7 @@ class Proxywatchd(): ## enable tor safeguard by default self.tor_safeguard = config.watchd.tor_safeguard rows = self.fetch_rows() + _dbg('prepare_jobs: %d rows, checktypes=%s' % (len(rows), config.watchd.checktypes)) checktypes = config.watchd.checktypes num_targets = 1 @@ -1813,6 +1836,7 @@ class Proxywatchd(): for ct in checktypes: if ct == 'irc': target_pools[ct] = config.servers + _dbg('target_pool[irc]: %d servers' % len(config.servers)) elif ct == 'judges': # Filter out judges in cooldown (blocked/rate-limited) all_judges = list(judges.keys()) @@ -1820,8 +1844,10 @@ class Proxywatchd(): target_pools[ct] = available if available else all_judges elif ct == 'ssl': target_pools[ct] = ssl_targets + _dbg('target_pool[ssl]: %d targets' % len(ssl_targets)) else: # http/head target_pools[ct] = list(regexes.keys()) + _dbg('target_pool[%s]: %d targets' % (ct, len(regexes))) # create all jobs first, then shuffle for interleaving all_jobs = [] @@ -1871,6 +1897,7 @@ class Proxywatchd(): self._close_db() proxy_count = len(new_states) job_count = len(all_jobs) + _dbg('prepare_jobs: created %d jobs for %d proxies' % (job_count, proxy_count)) now = time.time() if proxy_count > 0 and (now - self.last_jobs_log) >= self.log_interval: _log("created %d jobs for %d proxies (%d targets each)" % ( @@ -1929,9 +1956,11 @@ class Proxywatchd(): args = [] latency_updates = [] # (proxy, latency_ms) for successful tests max_fail = config.watchd.max_fail + _dbg('submit_collected: %d jobs' % len(self.collected)) for job in self.collected: if job.failcount == 0: sc += 1 + _dbg('submit OK: failcount=0', job.proxy) if job.last_latency_ms is not None: latency_updates.append((job.proxy, job.last_latency_ms)) # Check if proxy should be marked as permanently dead