proxywatchd: add DNS cache TTL, IP validation, debug logging

- DNS cache entries now expire after 1 hour (DNS_CACHE_TTL)
- Add is_valid_ip() to validate extracted IPs have valid octets
- Add debug logging to silent exception blocks (ASN lookup, socket disconnect)
- Convert rwip() to static method with cleaner int() implementation
- Add _sample_dbg() for sampled diagnostic logging
- Add set_config() for worker mode imports
- Remove unused tor_targets (tor checktype was removed)
- Move tor_proxy_url to misc.py for shared use
This commit is contained in:
Username
2025-12-28 14:12:50 +01:00
parent 58d83ae52f
commit e46a3f6ef1

View File

@@ -40,7 +40,7 @@ from config import Config
import mysqlite
import dbs
from misc import _log, categorize_error
from misc import _log, categorize_error, tor_proxy_url, is_ssl_protocol_error
import rocksock
import connection_pool
@@ -54,12 +54,23 @@ except ImportError:
config = Config()
def set_config(cfg):
"""Set the config object (used when imported from ppf.py)."""
global config
config = cfg
_run_standalone = False
cached_dns = {}
cached_dns = {} # {hostname: (ip, timestamp)}
DNS_CACHE_TTL = 3600 # 1 hour
# Debug mode for proxy check path - set via PPF_DEBUG env or config
_debug_proxy = os.environ.get('PPF_DEBUG', '').lower() in ('1', 'true', 'proxy')
# Sampling debug: print detailed diagnostics for every Nth test
_sample_debug_interval = 50 # Print debug for every 50th test (lowered for diagnosis)
_sample_debug_counter = 0
_sample_debug_lock = threading.Lock()
def _dbg(msg, proxy=None):
"""Debug log for proxy check path. Only logs when PPF_DEBUG=1."""
if _debug_proxy:
@@ -67,8 +78,31 @@ def _dbg(msg, proxy=None):
# Use 'dbg' category (shows at info level) instead of 'debug' (filtered by default)
_log('%s%s' % (prefix, msg), 'dbg')
def _sample_dbg(msg, proxy=None, force=False):
"""Sampled debug: log every Nth test for diagnostics without flooding."""
global _sample_debug_counter
should_log = force
if not should_log:
with _sample_debug_lock:
_sample_debug_counter += 1
if _sample_debug_counter >= _sample_debug_interval:
_sample_debug_counter = 0
should_log = True
if should_log:
prefix = '[SAMPLE %s] ' % proxy if proxy else '[SAMPLE] '
_log('%s%s' % (prefix, msg), 'diag')
# IP pattern for judge services (validates response contains valid IP in body)
IP_PATTERN = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}'
def is_valid_ip(ip_str):
"""Validate IP address octets are 0-255."""
try:
parts = ip_str.split('.')
return len(parts) == 4 and all(0 <= int(p) <= 255 for p in parts)
except (ValueError, AttributeError):
return False
# Pattern for header echo - if X-Forwarded-For or Via present, proxy reveals chain
HEADER_REVEAL_PATTERN = r'(X-Forwarded-For|Via|X-Real-Ip|Forwarded)'
@@ -273,13 +307,6 @@ ssl_targets = [
'www.letsencrypt.org',
]
# Tor check targets - verify proxy exits through Tor network
# Response contains JSON with IsTor: true/false
tor_targets = [
'check.torproject.org/api/ip',
]
class Stats():
"""Track and report comprehensive runtime statistics."""
@@ -301,8 +328,7 @@ class Stats():
self.proto_tested = {'http': 0, 'socks4': 0, 'socks5': 0}
self.proto_passed = {'http': 0, 'socks4': 0, 'socks5': 0}
self.proto_failed = {'http': {}, 'socks4': {}, 'socks5': {}} # Failures by category per proto
# Legacy alias for compatibility
self.by_proto = self.proto_passed
self.by_proto = self.proto_passed # Alias for dashboard API
# Time series history (5s intervals)
self.rate_history = []
@@ -399,6 +425,11 @@ class Stats():
# Track failures by protocol
if proto and proto in self.proto_failed:
self.proto_failed[proto][category] = self.proto_failed[proto].get(category, 0) + 1
# Log failure category breakdown every 1000 failures
if self.failed % 1000 == 0:
top_cats = sorted(self.fail_categories.items(), key=lambda x: -x[1])[:5]
cats_str = ', '.join(['%s:%d' % (c, n) for c, n in top_cats])
_log('fail breakdown (%d total): %s' % (self.failed, cats_str), 'diag')
# SSL/TLS tracking
if ssl_test:
@@ -677,19 +708,6 @@ def try_div(a, b):
return 0
def tor_proxy_url(torhost):
"""Generate Tor SOCKS5 proxy URL with random credentials for circuit isolation.
Tor treats different username:password as separate streams, using different
circuits. This ensures each connection gets a fresh circuit.
"""
# 8 random alphanumeric chars for user and pass
chars = string.ascii_lowercase + string.digits
user = ''.join(random.choice(chars) for _ in range(8))
passwd = ''.join(random.choice(chars) for _ in range(8))
return 'socks5://%s:%s@%s' % (user, passwd, torhost)
class MITMCertStats(object):
"""Track MITM certificate statistics."""
@@ -1081,27 +1099,37 @@ class ThreadScaler(object):
current_threads, queue_size, self.target_queue_per_thread)
def socks4_resolve(srvname, server_port):
srv = srvname
if srv in cached_dns:
srv = cached_dns[srvname]
if config.watchd.debug:
_log("using cached ip (%s) for %s"%(srv, srvname), "debug")
else:
dns_fail = False
try:
af, sa = rocksock.resolve(rocksock.RocksockHostinfo(srvname, server_port), want_v4=True)
if sa is not None:
cached_dns[srvname] = sa[0]
srv = sa[0]
else: dns_fail = True
except rocksock.RocksockException as e:
assert(e.get_errortype() == rocksock.RS_ET_GAI)
"""Resolve hostname to IP for SOCKS4 (which requires numeric IP).
Caches results for DNS_CACHE_TTL seconds to avoid repeated lookups.
"""
now = time.time()
# Check cache with TTL
if srvname in cached_dns:
ip, ts = cached_dns[srvname]
if now - ts < DNS_CACHE_TTL:
if config.watchd.debug:
_log("using cached ip (%s) for %s" % (ip, srvname), "debug")
return ip
# Expired - fall through to re-resolve
# Resolve hostname
dns_fail = False
try:
af, sa = rocksock.resolve(rocksock.RocksockHostinfo(srvname, server_port), want_v4=True)
if sa is not None:
cached_dns[srvname] = (sa[0], now)
return sa[0]
else:
dns_fail = True
if dns_fail:
fail_inc = 0
_log("could not resolve connection target %s"%srvname, "ERROR")
return False
return srv
except rocksock.RocksockException as e:
assert(e.get_errortype() == rocksock.RS_ET_GAI)
dns_fail = True
if dns_fail:
_log("could not resolve connection target %s" % srvname, "ERROR")
return False
return srvname
class ProxyTestState():
@@ -1194,14 +1222,10 @@ class ProxyTestState():
with self.lock:
return len(self.results) >= self.num_targets
def rwip(self, ip):
@staticmethod
def rwip(ip):
"""Remove leading zeros from IP octets (e.g., 192.168.001.010 -> 192.168.1.10)."""
n = []
for b in ip.split('.'):
while b[0] == '0' and len(b) > 1:
b = b[1:]
n.append(b)
return '.'.join(n)
return '.'.join(str(int(b)) for b in ip.split('.'))
def evaluate(self):
"""Evaluate results after all tests complete.
@@ -1255,8 +1279,9 @@ class ProxyTestState():
asn_result = asndb.lookup(self.ip)
if asn_result and asn_result[0]:
self.asn = asn_result[0]
except Exception:
pass
except Exception as e:
if config.watchd.debug:
_log('ASN lookup failed for %s: %s' % (self.ip, e), 'debug')
self.proto = last_good['proto']
self.failcount = 0
@@ -1291,7 +1316,7 @@ class ProxyTestState():
_log('%s://%s:%d (%s) d: %.2f sec(s);%s%s srv: %s; ssl: %s; %d/%d targets' % (
last_good['proto'], self.ip, self.port, self.country,
last_good['duration'], torstats, anon_status, last_good['srv'], str(last_good['ssl']),
num_success, self.num_targets), 'xxxxx')
num_success, self.num_targets), 'info')
_dbg('PASS: failcount=0', self.proxy)
return (True, None)
@@ -1318,6 +1343,13 @@ class TargetTestJob():
def run(self):
"""Test the proxy against this job's target server."""
# DIAGNOSTIC: Verify run() is being called
global _sample_debug_counter
with _sample_debug_lock:
_sample_debug_counter += 1
if _sample_debug_counter <= 3 or _sample_debug_counter % 50 == 0:
_log('JOB RUN #%d: %s -> %s (%s)' % (_sample_debug_counter,
self.proxy_state.proxy, self.target_srv, self.checktype), 'info')
network_stats.set_category('proxy')
_dbg('test start: %s via %s' % (self.target_srv, self.checktype), self.proxy_state.proxy)
sock, proto, duration, tor, srv, failinc, is_ssl, err_cat = self._connect_and_test()
@@ -1345,17 +1377,16 @@ class TargetTestJob():
try:
recv = sock.recv(-1)
_sample_dbg('RECV: %d bytes from %s, first 80: %r' % (len(recv), srv, recv[:80]), self.proxy_state.proxy)
# Select regex based on check type (or fallback target)
if 'check.torproject.org' in srv:
# Tor API fallback or tor checktype
# Tor API fallback (judge using torproject.org)
regex = r'"IsTor"\s*:\s*true'
elif self.checktype == 'irc':
regex = '^(:|NOTICE|ERROR)'
elif self.checktype == 'judges':
regex = judges[srv]
elif self.checktype == 'tor':
regex = r'"IsTor"\s*:\s*true'
elif self.checktype == 'ssl':
# Should not reach here - ssl returns before recv
self.proxy_state.record_result(True, proto=proto, srv=srv, ssl=is_ssl)
@@ -1370,9 +1401,9 @@ class TargetTestJob():
# Extract exit IP from judge/tor response
exit_ip = None
reveals_headers = None
if self.checktype == 'judges' or self.checktype == 'tor' or 'check.torproject.org' in srv:
if self.checktype == 'judges' or 'check.torproject.org' in srv:
ip_match = re.search(IP_PATTERN, recv)
if ip_match:
if ip_match and is_valid_ip(ip_match.group(0)):
exit_ip = ip_match.group(0)
if self.checktype == 'judges' and 'check.torproject.org' not in srv:
# Check for header echo judge (elite detection)
@@ -1421,9 +1452,14 @@ class TargetTestJob():
"""Connect to target through the proxy and send test packet."""
ps = self.proxy_state
_dbg('_connect_and_test: target=%s checktype=%s' % (self.target_srv, self.checktype), ps.proxy)
# Always log first test to verify code path
global _sample_debug_counter
if _sample_debug_counter == 0:
_log('FIRST TEST: proxy=%s target=%s check=%s' % (ps.proxy, self.target_srv, self.checktype), 'info')
_sample_dbg('TEST START: proxy=%s target=%s check=%s' % (ps.proxy, self.target_srv, self.checktype), ps.proxy)
srvname = self.target_srv.strip() if self.checktype == 'irc' else self.target_srv
# For judges/tor, extract host from 'host/path' format
if (self.checktype == 'judges' or self.checktype == 'tor') and '/' in srvname:
# For judges, extract host from 'host/path' format
if self.checktype == 'judges' and '/' in srvname:
connect_host = srvname.split('/')[0]
else:
connect_host = srvname
@@ -1435,7 +1471,7 @@ class TargetTestJob():
server_port = 443
verifycert = True
else:
# head, judges, tor, irc: always use plain HTTP
# head, judges, irc: always use plain HTTP
use_ssl = 0
ssl_only_check = False
verifycert = False
@@ -1483,8 +1519,11 @@ class TargetTestJob():
proxies=proxies, timeout=adaptive_timeout,
verifycert=verifycert)
_dbg('connecting: proto=%s tor=%s ssl=%d' % (proto, torhost, use_ssl), ps.proxy)
_sample_dbg('CONNECT: tor=%s -> proxy=%s:%s (%s) -> %s:%d ssl=%d timeout=%.1f' % (
torhost, ps.ip, ps.port, proto, srv, server_port, use_ssl, adaptive_timeout), ps.proxy)
sock.connect()
_dbg('connected OK', ps.proxy)
_sample_dbg('CONNECTED OK: %s via %s' % (ps.proxy, proto), ps.proxy)
# SSL-only check: handshake passed, no request needed
if ssl_only_check:
@@ -1496,13 +1535,13 @@ class TargetTestJob():
if self.checktype == 'irc':
sock.send('NICK\n')
elif self.checktype == 'judges' or self.checktype == 'tor':
# GET request to receive body (IP for judges, JSON for tor)
elif self.checktype == 'judges':
# GET request to receive body (IP)
sock.send('GET /%s HTTP/1.0\r\nHost: %s\r\n\r\n' % (
srvname.split('/', 1)[1] if '/' in srvname else '',
srvname.split('/')[0]
))
else: # http - HEAD is sufficient for header checks
else: # head - HEAD is sufficient for header checks
sock.send('HEAD / HTTP/1.0\r\nHost: %s\r\n\r\n' % srvname)
# Record success in pool
if pool:
@@ -1518,6 +1557,8 @@ class TargetTestJob():
et = e.get_errortype()
err = e.get_error()
fp = e.get_failedproxy()
_sample_dbg('ERROR: %s via %s -> %s (et=%d err=%d fp=%d cat=%s)' % (
ps.proxy, proto, e.get_errormessage(), et, err, fp, last_error_category), ps.proxy)
sock.disconnect()
@@ -1560,41 +1601,51 @@ class TargetTestJob():
_log('failed to extract MITM cert: %s' % str(e), 'debug')
return None, proto, duration, torhost, srvname, 0, use_ssl, 'ssl_mitm'
elif et == rocksock.RS_ET_SSL and ssl_only_check:
# SSL handshake failed (but proxy protocol worked) - verify with HTTP
# Only for 'ssl' checktype; cert errors handled above as MITM
try:
sock.disconnect()
except Exception:
pass
# Delay before secondary check (allows different Tor circuit)
time.sleep(0.3)
if config.watchd.debug:
_log('SSL handshake failed, fallback to HTTP: %s://%s:%d' % (proto, ps.ip, ps.port), 'debug')
try:
# Secondary check via Tor Project API (plain HTTP)
tor_check_host = 'check.torproject.org'
if ps.auth:
fallback_proxy_url = '%s://%s@%s:%s' % (proto, ps.auth, ps.ip, ps.port)
else:
fallback_proxy_url = '%s://%s:%s' % (proto, ps.ip, ps.port)
fallback_proxies = [
rocksock.RocksockProxyFromURL(tor_proxy_url(torhost)),
rocksock.RocksockProxyFromURL(fallback_proxy_url),
]
fallback_sock = rocksock.Rocksock(host=tor_check_host, port=80, ssl=0,
proxies=fallback_proxies, timeout=adaptive_timeout)
fallback_sock.connect()
fallback_sock.send('GET /api/ip HTTP/1.0\r\nHost: %s\r\n\r\n' % tor_check_host)
elapsed = time.time() - duration
if pool:
pool.record_success(torhost, elapsed)
return fallback_sock, proto, duration, torhost, tor_check_host + '/api/ip', 0, 0, 'ssl_fallback_tor'
except rocksock.RocksockException:
pass # Fallback failed, continue to next protocol
# SSL handshake failed - check if protocol error vs other error
# fp contains the SSL error reason string
if is_ssl_protocol_error(fp):
# Protocol error (WRONG_VERSION_NUMBER, etc.) - proxy doesn't support SSL
# No fallback needed, just fail this proxy for SSL
if config.watchd.debug:
_log('SSL protocol error, no fallback: %s://%s:%d (%s)' % (proto, ps.ip, ps.port, fp), 'debug')
# Continue to try next protocol
else:
# Other SSL error - verify with HTTP HEAD fallback
try:
sock.disconnect()
except Exception as e:
if config.watchd.debug:
_log('socket disconnect failed: %s' % e, 'debug')
# Delay before secondary check (allows different Tor circuit)
time.sleep(0.3)
if config.watchd.debug:
_log('SSL error, fallback to HTTP HEAD: %s://%s:%d (%s)' % (proto, ps.ip, ps.port, fp), 'debug')
try:
# Secondary check via HTTP HEAD (same as 'head' checktype)
fallback_host = random.choice(list(regexes.keys()))
if ps.auth:
fallback_proxy_url = '%s://%s@%s:%s' % (proto, ps.auth, ps.ip, ps.port)
else:
fallback_proxy_url = '%s://%s:%s' % (proto, ps.ip, ps.port)
fallback_proxies = [
rocksock.RocksockProxyFromURL(tor_proxy_url(torhost)),
rocksock.RocksockProxyFromURL(fallback_proxy_url),
]
fallback_sock = rocksock.Rocksock(host=fallback_host, port=80, ssl=0,
proxies=fallback_proxies, timeout=adaptive_timeout)
fallback_sock.connect()
fallback_sock.send('HEAD / HTTP/1.0\r\nHost: %s\r\n\r\n' % fallback_host)
elapsed = time.time() - duration
if pool:
pool.record_success(torhost, elapsed)
return fallback_sock, proto, duration, torhost, fallback_host, 0, 0, 'ssl_fallback_head'
except rocksock.RocksockException:
pass # Fallback failed, continue to next protocol
except KeyboardInterrupt as e:
raise e
_sample_dbg('ALL PROTOS FAILED: %s last_cat=%s' % (ps.proxy, last_error_category), ps.proxy)
return None, None, None, None, None, 1, use_ssl, last_error_category
@@ -1853,10 +1904,7 @@ class Proxywatchd():
elif ct == 'ssl':
target_pools[ct] = ssl_targets
_dbg('target_pool[ssl]: %d targets' % len(ssl_targets))
elif ct == 'tor':
target_pools[ct] = tor_targets
_dbg('target_pool[tor]: %d targets' % len(tor_targets))
else: # http/head
else: # head
target_pools[ct] = list(regexes.keys())
_dbg('target_pool[%s]: %d targets' % (ct, len(regexes)))
@@ -2135,7 +2183,9 @@ class Proxywatchd():
stats_data['queue_size'] = self.job_queue.qsize()
stats_data['checktype'] = ','.join(config.watchd.checktypes)
stats_data['checktypes'] = config.watchd.checktypes
stats_data['profiling'] = getattr(config.args, 'profile', False) if hasattr(config, 'args') else False
stats_data['profiling'] = (
getattr(config.args, 'profile', False) if hasattr(config, 'args') else False
) or getattr(config.common, 'profiling', False)
stats_data['pass_rate'] = try_div(self.stats.passed, elapsed)
# Tor pool stats