watchd: add protocol fingerprint probes and fix nullable counters
Add lightweight SOCKS5/SOCKS4/HTTP handshake probes to identify proxy protocol before full testing. Guard consecutive_success, success_count, and total_duration against NoneType from worker-reported upserts. Track last_check and last_target for test provenance.
This commit is contained in:
113
proxywatchd.py
113
proxywatchd.py
@@ -299,7 +299,8 @@ class ProxyTestState(object):
|
||||
'asn', 'isoldies', 'completion_queue', 'lock', 'results', 'completed',
|
||||
'evaluated', 'last_latency_ms', 'exit_ip', 'reveals_headers',
|
||||
'last_fail_category', 'original_failcount', 'had_ssl_test', 'ssl_success',
|
||||
'cert_error', 'source_proto', 'protos_working'
|
||||
'cert_error', 'source_proto', 'protos_working',
|
||||
'last_check', 'last_target'
|
||||
)
|
||||
|
||||
def __init__(self, ip, port, proto, failcount, success_count, total_duration,
|
||||
@@ -343,6 +344,9 @@ class ProxyTestState(object):
|
||||
# Protocol fingerprinting
|
||||
self.source_proto = source_proto
|
||||
self.protos_working = None
|
||||
# Test provenance
|
||||
self.last_check = None
|
||||
self.last_target = None
|
||||
|
||||
def record_result(self, success, proto=None, duration=0, srv=None, tor=None, ssl=None, category=None, exit_ip=None, reveals_headers=None):
|
||||
"""Record a single target test result. Thread-safe.
|
||||
@@ -480,9 +484,9 @@ class ProxyTestState(object):
|
||||
# and only if this test didn't detect MITM
|
||||
if self.consecutive_success > 0 and (self.consecutive_success % 3) == 0 and not self.cert_error:
|
||||
self.mitm = 0
|
||||
self.consecutive_success += 1
|
||||
self.success_count += 1
|
||||
self.total_duration += int(last_good['duration'] * 1000)
|
||||
self.consecutive_success = (self.consecutive_success or 0) + 1
|
||||
self.success_count = (self.success_count or 0) + 1
|
||||
self.total_duration = (self.total_duration or 0) + int(last_good['duration'] * 1000)
|
||||
|
||||
# Calculate average latency from successful tests (in ms)
|
||||
durations = [s['duration'] for s in successes if s['duration']]
|
||||
@@ -543,6 +547,9 @@ class TargetTestJob(object):
|
||||
_log('JOB RUN #%d: %s -> %s (%s)' % (_sample_debug_counter,
|
||||
self.proxy_state.proxy, self.target_srv, self.checktype), 'info')
|
||||
network_stats.set_category('proxy')
|
||||
# Track test provenance (overwritten on each attempt, last success wins)
|
||||
self.proxy_state.last_check = self.checktype
|
||||
self.proxy_state.last_target = self.target_srv
|
||||
_dbg('test start: %s via %s' % (self.target_srv, self.checktype), self.proxy_state.proxy)
|
||||
sock, proto, duration, tor, srv, failinc, is_ssl, err_cat = self._connect_and_test()
|
||||
_dbg('connect result: sock=%s proto=%s err=%s' % (bool(sock), proto, err_cat), self.proxy_state.proxy)
|
||||
@@ -682,6 +689,90 @@ class TargetTestJob(object):
|
||||
protos.append(p)
|
||||
return protos
|
||||
|
||||
def _fingerprint_protocol(self, pool):
|
||||
"""Identify proxy protocol via lightweight handshake probes.
|
||||
|
||||
Sends protocol-specific greeting bytes directly to the proxy
|
||||
and identifies the protocol from the response pattern.
|
||||
|
||||
Returns: 'socks5', 'socks4', 'http', or None
|
||||
"""
|
||||
ps = self.proxy_state
|
||||
fp_timeout = min(config.watchd.timeout, 5)
|
||||
torhost = pool.get_tor_host(self.worker_id) if pool else random.choice(config.torhosts)
|
||||
|
||||
for probe_fn, name in (
|
||||
(self._probe_socks5, 'socks5'),
|
||||
(self._probe_socks4, 'socks4'),
|
||||
(self._probe_http, 'http'),
|
||||
):
|
||||
result = probe_fn(ps, torhost, fp_timeout)
|
||||
if result:
|
||||
_sample_dbg('fingerprint: %s detected' % result, ps.proxy)
|
||||
return result
|
||||
return None
|
||||
|
||||
def _probe_socks5(self, ps, torhost, timeout):
|
||||
"""Probe for SOCKS5 protocol. Returns 'socks5' or None."""
|
||||
try:
|
||||
sock = rocksock.Rocksock(
|
||||
host=ps.ip, port=int(ps.port),
|
||||
proxies=[rocksock.RocksockProxyFromURL(tor_proxy_url(torhost))],
|
||||
timeout=timeout
|
||||
)
|
||||
sock.connect()
|
||||
sock.send('\x05\x01\x00')
|
||||
res = sock.recv(2)
|
||||
sock.disconnect()
|
||||
if len(res) >= 1 and res[0] == '\x05':
|
||||
return 'socks5'
|
||||
except rocksock.RocksockException:
|
||||
pass
|
||||
except KeyboardInterrupt:
|
||||
raise
|
||||
return None
|
||||
|
||||
def _probe_socks4(self, ps, torhost, timeout):
|
||||
"""Probe for SOCKS4 protocol. Returns 'socks4' or None."""
|
||||
try:
|
||||
sock = rocksock.Rocksock(
|
||||
host=ps.ip, port=int(ps.port),
|
||||
proxies=[rocksock.RocksockProxyFromURL(tor_proxy_url(torhost))],
|
||||
timeout=timeout
|
||||
)
|
||||
sock.connect()
|
||||
# CONNECT 1.1.1.1:80
|
||||
sock.send('\x04\x01\x00\x50\x01\x01\x01\x01\x00')
|
||||
res = sock.recv(2)
|
||||
sock.disconnect()
|
||||
if len(res) >= 2 and ord(res[0]) == 0 and ord(res[1]) in (0x5a, 0x5b, 0x5c, 0x5d):
|
||||
return 'socks4'
|
||||
except rocksock.RocksockException:
|
||||
pass
|
||||
except KeyboardInterrupt:
|
||||
raise
|
||||
return None
|
||||
|
||||
def _probe_http(self, ps, torhost, timeout):
|
||||
"""Probe for HTTP CONNECT protocol. Returns 'http' or None."""
|
||||
try:
|
||||
sock = rocksock.Rocksock(
|
||||
host=ps.ip, port=int(ps.port),
|
||||
proxies=[rocksock.RocksockProxyFromURL(tor_proxy_url(torhost))],
|
||||
timeout=timeout
|
||||
)
|
||||
sock.connect()
|
||||
sock.send('CONNECT 1.1.1.1:80 HTTP/1.1\r\nHost: 1.1.1.1:80\r\n\r\n')
|
||||
res = sock.recv(13)
|
||||
sock.disconnect()
|
||||
if res.startswith('HTTP/'):
|
||||
return 'http'
|
||||
except rocksock.RocksockException:
|
||||
pass
|
||||
except KeyboardInterrupt:
|
||||
raise
|
||||
return None
|
||||
|
||||
def _connect_and_test(self):
|
||||
"""Connect to target through the proxy and send test packet.
|
||||
|
||||
@@ -702,6 +793,12 @@ class TargetTestJob(object):
|
||||
protos = self._build_proto_order()
|
||||
pool = connection_pool.get_pool()
|
||||
|
||||
# Fingerprint unknown proxies to avoid brute-force protocol guessing
|
||||
if ps.proto is None and config.watchd.fingerprint:
|
||||
detected = self._fingerprint_protocol(pool)
|
||||
if detected:
|
||||
protos = [detected] + [p for p in protos if p != detected]
|
||||
|
||||
# Phase 1: SSL handshake (if ssl_first enabled or SSL-only mode)
|
||||
if config.watchd.ssl_first or self.checktype == 'none':
|
||||
result = self._try_ssl_handshake(protos, pool)
|
||||
@@ -1519,7 +1616,8 @@ class Proxywatchd():
|
||||
dead_count += 1
|
||||
args.append((effective_failcount, job.checktime, 1, job.country, job.proto,
|
||||
job.success_count, job.total_duration, job.mitm,
|
||||
job.consecutive_success, job.asn, job.protos_working, job.proxy))
|
||||
job.consecutive_success, job.asn, job.protos_working,
|
||||
job.last_check, job.last_target, job.proxy))
|
||||
|
||||
success_rate = (float(sc) / len(self.collected)) * 100
|
||||
ret = True
|
||||
@@ -1533,7 +1631,8 @@ class Proxywatchd():
|
||||
if job.failcount == 0:
|
||||
args.append((job.failcount, job.checktime, 1, job.country, job.proto,
|
||||
job.success_count, job.total_duration, job.mitm,
|
||||
job.consecutive_success, job.asn, job.protos_working, job.proxy))
|
||||
job.consecutive_success, job.asn, job.protos_working,
|
||||
job.last_check, job.last_target, job.proxy))
|
||||
if job.last_latency_ms is not None:
|
||||
latency_updates.append((job.proxy, job.last_latency_ms))
|
||||
ret = False
|
||||
@@ -1550,7 +1649,7 @@ class Proxywatchd():
|
||||
if job.failcount == 0 and job.exit_ip]
|
||||
|
||||
with self._db_context() as db:
|
||||
query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=?,asn=?,protos_working=? WHERE proxy=?'
|
||||
query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=?,asn=?,protos_working=?,last_check=?,last_target=? WHERE proxy=?'
|
||||
db.executemany(query, args)
|
||||
|
||||
# Batch update latency metrics for successful proxies
|
||||
|
||||
Reference in New Issue
Block a user