watchd: configurable adaptive timeout per failure
This commit is contained in:
@@ -41,6 +41,10 @@ min_threads = 5
|
|||||||
# Timeout for proxy test connections (seconds)
|
# Timeout for proxy test connections (seconds)
|
||||||
timeout = 15
|
timeout = 15
|
||||||
|
|
||||||
|
# Adaptive timeout: extra time per failure (seconds)
|
||||||
|
timeout_fail_inc = 1.5
|
||||||
|
timeout_fail_max = 15
|
||||||
|
|
||||||
# Number of failures before proxy marked dead
|
# Number of failures before proxy marked dead
|
||||||
max_fail = 5
|
max_fail = 5
|
||||||
|
|
||||||
|
|||||||
@@ -97,6 +97,8 @@ class Config(ComboParser):
|
|||||||
self.add_item(section, 'threads', int, 10, 'number of threads watchd uses to check proxies', True)
|
self.add_item(section, 'threads', int, 10, 'number of threads watchd uses to check proxies', True)
|
||||||
self.add_item(section, 'min_threads', int, 0, 'minimum threads (0 = auto: threads/4)', False)
|
self.add_item(section, 'min_threads', int, 0, 'minimum threads (0 = auto: threads/4)', False)
|
||||||
self.add_item(section, 'timeout', int, 15, 'timeout for blocking operations (connect/recv/...) for proxy checks in seconds', False)
|
self.add_item(section, 'timeout', int, 15, 'timeout for blocking operations (connect/recv/...) for proxy checks in seconds', False)
|
||||||
|
self.add_item(section, 'timeout_fail_inc', float, 1.5, 'extra timeout per failure (default: 1.5)', False)
|
||||||
|
self.add_item(section, 'timeout_fail_max', float, 15, 'max extra timeout for failures (default: 15)', False)
|
||||||
self.add_item(section, 'submit_after', int, 200, 'min. number of tested proxies for DB write', False)
|
self.add_item(section, 'submit_after', int, 200, 'min. number of tested proxies for DB write', False)
|
||||||
self.add_item(section, 'debug', bool, False, 'whether to print additional debug info', False)
|
self.add_item(section, 'debug', bool, False, 'whether to print additional debug info', False)
|
||||||
self.add_item(section, 'use_ssl', int, 1, 'whether to use SSL (1=always, 0=never, 2=random)', False)
|
self.add_item(section, 'use_ssl', int, 1, 'whether to use SSL (1=always, 0=never, 2=random)', False)
|
||||||
|
|||||||
@@ -57,6 +57,16 @@ config = Config()
|
|||||||
_run_standalone = False
|
_run_standalone = False
|
||||||
cached_dns = {}
|
cached_dns = {}
|
||||||
|
|
||||||
|
# Debug mode for proxy check path - set via PPF_DEBUG env or config
|
||||||
|
_debug_proxy = os.environ.get('PPF_DEBUG', '').lower() in ('1', 'true', 'proxy')
|
||||||
|
|
||||||
|
def _dbg(msg, proxy=None):
|
||||||
|
"""Debug log for proxy check path. Only logs when PPF_DEBUG=1."""
|
||||||
|
if _debug_proxy:
|
||||||
|
prefix = '[%s] ' % proxy if proxy else ''
|
||||||
|
# Use 'dbg' category (shows at info level) instead of 'debug' (filtered by default)
|
||||||
|
_log('%s%s' % (prefix, msg), 'dbg')
|
||||||
|
|
||||||
# IP pattern for judge services (validates response contains valid IP in body)
|
# IP pattern for judge services (validates response contains valid IP in body)
|
||||||
IP_PATTERN = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}'
|
IP_PATTERN = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}'
|
||||||
# Pattern for header echo - if X-Forwarded-For or Via present, proxy reveals chain
|
# Pattern for header echo - if X-Forwarded-For or Via present, proxy reveals chain
|
||||||
@@ -1139,6 +1149,7 @@ class ProxyTestState():
|
|||||||
|
|
||||||
When all target tests complete, signals via completion_queue.
|
When all target tests complete, signals via completion_queue.
|
||||||
"""
|
"""
|
||||||
|
_dbg('record: success=%s proto=%s srv=%s cat=%s' % (success, proto, srv, category), self.proxy)
|
||||||
should_signal = False
|
should_signal = False
|
||||||
with self.lock:
|
with self.lock:
|
||||||
self.results.append({
|
self.results.append({
|
||||||
@@ -1203,6 +1214,7 @@ class ProxyTestState():
|
|||||||
successes = [r for r in self.results if r['success']]
|
successes = [r for r in self.results if r['success']]
|
||||||
failures = [r for r in self.results if not r['success']]
|
failures = [r for r in self.results if not r['success']]
|
||||||
num_success = len(successes)
|
num_success = len(successes)
|
||||||
|
_dbg('evaluate: %d success, %d fail, results=%d' % (num_success, len(failures), len(self.results)), self.proxy)
|
||||||
|
|
||||||
# Determine dominant failure category
|
# Determine dominant failure category
|
||||||
fail_category = None
|
fail_category = None
|
||||||
@@ -1274,6 +1286,7 @@ class ProxyTestState():
|
|||||||
last_good['proto'], self.ip, self.port, self.country,
|
last_good['proto'], self.ip, self.port, self.country,
|
||||||
last_good['duration'], torstats, anon_status, last_good['srv'], str(last_good['ssl']),
|
last_good['duration'], torstats, anon_status, last_good['srv'], str(last_good['ssl']),
|
||||||
num_success, self.num_targets), 'xxxxx')
|
num_success, self.num_targets), 'xxxxx')
|
||||||
|
_dbg('PASS: failcount=0', self.proxy)
|
||||||
return (True, None)
|
return (True, None)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
@@ -1281,6 +1294,7 @@ class ProxyTestState():
|
|||||||
self.failcount += 1
|
self.failcount += 1
|
||||||
self.consecutive_success = 0
|
self.consecutive_success = 0
|
||||||
self.last_fail_category = fail_category
|
self.last_fail_category = fail_category
|
||||||
|
_dbg('FAIL: failcount=%d cat=%s' % (self.failcount, fail_category), self.proxy)
|
||||||
return (False, fail_category)
|
return (False, fail_category)
|
||||||
|
|
||||||
|
|
||||||
@@ -1299,7 +1313,9 @@ class TargetTestJob():
|
|||||||
def run(self):
|
def run(self):
|
||||||
"""Test the proxy against this job's target server."""
|
"""Test the proxy against this job's target server."""
|
||||||
network_stats.set_category('proxy')
|
network_stats.set_category('proxy')
|
||||||
|
_dbg('test start: %s via %s' % (self.target_srv, self.checktype), self.proxy_state.proxy)
|
||||||
sock, proto, duration, tor, srv, failinc, is_ssl, err_cat = self._connect_and_test()
|
sock, proto, duration, tor, srv, failinc, is_ssl, err_cat = self._connect_and_test()
|
||||||
|
_dbg('connect result: sock=%s proto=%s err=%s' % (bool(sock), proto, err_cat), self.proxy_state.proxy)
|
||||||
|
|
||||||
if not sock:
|
if not sock:
|
||||||
# SSL-only check passed (handshake success, no request needed)
|
# SSL-only check passed (handshake success, no request needed)
|
||||||
@@ -1336,8 +1352,10 @@ class TargetTestJob():
|
|||||||
else: # http
|
else: # http
|
||||||
regex = regexes[srv]
|
regex = regexes[srv]
|
||||||
|
|
||||||
|
_dbg('recv %d bytes, regex=%s' % (len(recv), regex[:30]), self.proxy_state.proxy)
|
||||||
if re.search(regex, recv, re.IGNORECASE):
|
if re.search(regex, recv, re.IGNORECASE):
|
||||||
elapsed = time.time() - duration
|
elapsed = time.time() - duration
|
||||||
|
_dbg('regex MATCH, elapsed=%.2fs' % elapsed, self.proxy_state.proxy)
|
||||||
# Extract exit IP from judge response
|
# Extract exit IP from judge response
|
||||||
exit_ip = None
|
exit_ip = None
|
||||||
reveals_headers = None
|
reveals_headers = None
|
||||||
@@ -1357,12 +1375,14 @@ class TargetTestJob():
|
|||||||
reveals_headers=reveals_headers
|
reveals_headers=reveals_headers
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
|
_dbg('regex NO MATCH, recv[:100]=%r' % recv[:100], self.proxy_state.proxy)
|
||||||
# Check if judge is blocking us (not a proxy failure)
|
# Check if judge is blocking us (not a proxy failure)
|
||||||
if self.checktype == 'judges' and JUDGE_BLOCK_RE.search(recv):
|
if self.checktype == 'judges' and JUDGE_BLOCK_RE.search(recv):
|
||||||
judge_stats.record_block(srv)
|
judge_stats.record_block(srv)
|
||||||
# Judge block = proxy worked, we got HTTP response, just no IP
|
# Judge block = proxy worked, we got HTTP response, just no IP
|
||||||
# Count as success without exit_ip
|
# Count as success without exit_ip
|
||||||
block_elapsed = time.time() - duration
|
block_elapsed = time.time() - duration
|
||||||
|
_dbg('judge BLOCK detected, counting as success', self.proxy_state.proxy)
|
||||||
self.proxy_state.record_result(
|
self.proxy_state.record_result(
|
||||||
True, proto=proto, duration=block_elapsed,
|
True, proto=proto, duration=block_elapsed,
|
||||||
srv=srv, tor=tor, ssl=is_ssl, exit_ip=None,
|
srv=srv, tor=tor, ssl=is_ssl, exit_ip=None,
|
||||||
@@ -1372,6 +1392,7 @@ class TargetTestJob():
|
|||||||
_log('judge %s challenged proxy %s (counted as success)' % (
|
_log('judge %s challenged proxy %s (counted as success)' % (
|
||||||
srv, self.proxy_state.proxy), 'debug')
|
srv, self.proxy_state.proxy), 'debug')
|
||||||
else:
|
else:
|
||||||
|
_dbg('FAIL: no match, no block', self.proxy_state.proxy)
|
||||||
if self.checktype == 'judges':
|
if self.checktype == 'judges':
|
||||||
judge_stats.record_failure(srv)
|
judge_stats.record_failure(srv)
|
||||||
self.proxy_state.record_result(False, category='other')
|
self.proxy_state.record_result(False, category='other')
|
||||||
@@ -1387,6 +1408,7 @@ class TargetTestJob():
|
|||||||
def _connect_and_test(self):
|
def _connect_and_test(self):
|
||||||
"""Connect to target through the proxy and send test packet."""
|
"""Connect to target through the proxy and send test packet."""
|
||||||
ps = self.proxy_state
|
ps = self.proxy_state
|
||||||
|
_dbg('_connect_and_test: target=%s checktype=%s' % (self.target_srv, self.checktype), ps.proxy)
|
||||||
srvname = self.target_srv.strip() if self.checktype == 'irc' else self.target_srv
|
srvname = self.target_srv.strip() if self.checktype == 'irc' else self.target_srv
|
||||||
# For judges, extract host from 'host/path' format
|
# For judges, extract host from 'host/path' format
|
||||||
if self.checktype == 'judges' and '/' in srvname:
|
if self.checktype == 'judges' and '/' in srvname:
|
||||||
@@ -1443,15 +1465,18 @@ class TargetTestJob():
|
|||||||
rocksock.RocksockProxyFromURL(proxy_url),
|
rocksock.RocksockProxyFromURL(proxy_url),
|
||||||
]
|
]
|
||||||
|
|
||||||
# Adaptive timeout: give proxies with failures slightly more time
|
# Adaptive timeout: give proxies with failures more time
|
||||||
# Linear increase capped at 5s extra (0 fails=base, 10 fails=+5s max)
|
adaptive_timeout = config.watchd.timeout + min(
|
||||||
adaptive_timeout = config.watchd.timeout + min(ps.failcount * 0.5, 5)
|
ps.failcount * config.watchd.timeout_fail_inc,
|
||||||
|
config.watchd.timeout_fail_max)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
sock = rocksock.Rocksock(host=srv, port=server_port, ssl=use_ssl,
|
sock = rocksock.Rocksock(host=srv, port=server_port, ssl=use_ssl,
|
||||||
proxies=proxies, timeout=adaptive_timeout,
|
proxies=proxies, timeout=adaptive_timeout,
|
||||||
verifycert=verifycert)
|
verifycert=verifycert)
|
||||||
|
_dbg('connecting: proto=%s tor=%s ssl=%d' % (proto, torhost, use_ssl), ps.proxy)
|
||||||
sock.connect()
|
sock.connect()
|
||||||
|
_dbg('connected OK', ps.proxy)
|
||||||
|
|
||||||
# SSL-only check: handshake passed, no request needed
|
# SSL-only check: handshake passed, no request needed
|
||||||
if ssl_only_check:
|
if ssl_only_check:
|
||||||
@@ -1783,15 +1808,12 @@ class Proxywatchd():
|
|||||||
q = 'SELECT ip,port,proto,failed,success_count,total_duration,country,mitm,consecutive_success,asn,proxy FROM proxylist WHERE failed >= ? and failed < ? and (tested + ? + (failed * ?)) < ? ORDER BY RANDOM()'
|
q = 'SELECT ip,port,proto,failed,success_count,total_duration,country,mitm,consecutive_success,asn,proxy FROM proxylist WHERE failed >= ? and failed < ? and (tested + ? + (failed * ?)) < ? ORDER BY RANDOM()'
|
||||||
now = time.time()
|
now = time.time()
|
||||||
params = (0, config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now)
|
params = (0, config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now)
|
||||||
if config.watchd.debug:
|
_dbg('fetch_rows: params=(0, %d, %d, %d, %.0f)' % (config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now))
|
||||||
_log('fetch_rows: db=%s checktime=%d perfail=%d max_fail=%d now=%d' % (
|
|
||||||
config.watchd.database, config.watchd.checktime, config.watchd.perfail_checktime,
|
|
||||||
config.watchd.max_fail, int(now)), 'debug')
|
|
||||||
rows = self.mysqlite.execute(q, params).fetchall()
|
rows = self.mysqlite.execute(q, params).fetchall()
|
||||||
if config.watchd.debug:
|
_dbg('fetch_rows: got %d rows, threads=%d' % (len(rows), config.watchd.threads))
|
||||||
_log('fetch_rows: got %d rows' % len(rows), 'debug')
|
|
||||||
# check oldies ?
|
# check oldies ?
|
||||||
if len(rows) < config.watchd.threads:
|
if len(rows) < config.watchd.threads:
|
||||||
|
_dbg('fetch_rows: rows < threads, clearing rows (oldies=%s)' % config.watchd.oldies)
|
||||||
rows = []
|
rows = []
|
||||||
if config.watchd.oldies:
|
if config.watchd.oldies:
|
||||||
self.isoldies = True
|
self.isoldies = True
|
||||||
@@ -1805,6 +1827,7 @@ class Proxywatchd():
|
|||||||
## enable tor safeguard by default
|
## enable tor safeguard by default
|
||||||
self.tor_safeguard = config.watchd.tor_safeguard
|
self.tor_safeguard = config.watchd.tor_safeguard
|
||||||
rows = self.fetch_rows()
|
rows = self.fetch_rows()
|
||||||
|
_dbg('prepare_jobs: %d rows, checktypes=%s' % (len(rows), config.watchd.checktypes))
|
||||||
checktypes = config.watchd.checktypes
|
checktypes = config.watchd.checktypes
|
||||||
num_targets = 1
|
num_targets = 1
|
||||||
|
|
||||||
@@ -1813,6 +1836,7 @@ class Proxywatchd():
|
|||||||
for ct in checktypes:
|
for ct in checktypes:
|
||||||
if ct == 'irc':
|
if ct == 'irc':
|
||||||
target_pools[ct] = config.servers
|
target_pools[ct] = config.servers
|
||||||
|
_dbg('target_pool[irc]: %d servers' % len(config.servers))
|
||||||
elif ct == 'judges':
|
elif ct == 'judges':
|
||||||
# Filter out judges in cooldown (blocked/rate-limited)
|
# Filter out judges in cooldown (blocked/rate-limited)
|
||||||
all_judges = list(judges.keys())
|
all_judges = list(judges.keys())
|
||||||
@@ -1820,8 +1844,10 @@ class Proxywatchd():
|
|||||||
target_pools[ct] = available if available else all_judges
|
target_pools[ct] = available if available else all_judges
|
||||||
elif ct == 'ssl':
|
elif ct == 'ssl':
|
||||||
target_pools[ct] = ssl_targets
|
target_pools[ct] = ssl_targets
|
||||||
|
_dbg('target_pool[ssl]: %d targets' % len(ssl_targets))
|
||||||
else: # http/head
|
else: # http/head
|
||||||
target_pools[ct] = list(regexes.keys())
|
target_pools[ct] = list(regexes.keys())
|
||||||
|
_dbg('target_pool[%s]: %d targets' % (ct, len(regexes)))
|
||||||
|
|
||||||
# create all jobs first, then shuffle for interleaving
|
# create all jobs first, then shuffle for interleaving
|
||||||
all_jobs = []
|
all_jobs = []
|
||||||
@@ -1871,6 +1897,7 @@ class Proxywatchd():
|
|||||||
self._close_db()
|
self._close_db()
|
||||||
proxy_count = len(new_states)
|
proxy_count = len(new_states)
|
||||||
job_count = len(all_jobs)
|
job_count = len(all_jobs)
|
||||||
|
_dbg('prepare_jobs: created %d jobs for %d proxies' % (job_count, proxy_count))
|
||||||
now = time.time()
|
now = time.time()
|
||||||
if proxy_count > 0 and (now - self.last_jobs_log) >= self.log_interval:
|
if proxy_count > 0 and (now - self.last_jobs_log) >= self.log_interval:
|
||||||
_log("created %d jobs for %d proxies (%d targets each)" % (
|
_log("created %d jobs for %d proxies (%d targets each)" % (
|
||||||
@@ -1929,9 +1956,11 @@ class Proxywatchd():
|
|||||||
args = []
|
args = []
|
||||||
latency_updates = [] # (proxy, latency_ms) for successful tests
|
latency_updates = [] # (proxy, latency_ms) for successful tests
|
||||||
max_fail = config.watchd.max_fail
|
max_fail = config.watchd.max_fail
|
||||||
|
_dbg('submit_collected: %d jobs' % len(self.collected))
|
||||||
for job in self.collected:
|
for job in self.collected:
|
||||||
if job.failcount == 0:
|
if job.failcount == 0:
|
||||||
sc += 1
|
sc += 1
|
||||||
|
_dbg('submit OK: failcount=0', job.proxy)
|
||||||
if job.last_latency_ms is not None:
|
if job.last_latency_ms is not None:
|
||||||
latency_updates.append((job.proxy, job.last_latency_ms))
|
latency_updates.append((job.proxy, job.last_latency_ms))
|
||||||
# Check if proxy should be marked as permanently dead
|
# Check if proxy should be marked as permanently dead
|
||||||
|
|||||||
Reference in New Issue
Block a user