From 4c5f4fa01d25de9d2e342f6b9e06c6be59eda030 Mon Sep 17 00:00:00 2001 From: Username Date: Tue, 17 Feb 2026 21:06:16 +0100 Subject: [PATCH] watchd: add protocol fingerprint probes and fix nullable counters Add lightweight SOCKS5/SOCKS4/HTTP handshake probes to identify proxy protocol before full testing. Guard consecutive_success, success_count, and total_duration against NoneType from worker-reported upserts. Track last_check and last_target for test provenance. --- proxywatchd.py | 113 ++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 106 insertions(+), 7 deletions(-) diff --git a/proxywatchd.py b/proxywatchd.py index 5c5f73c..4ef16e8 100644 --- a/proxywatchd.py +++ b/proxywatchd.py @@ -299,7 +299,8 @@ class ProxyTestState(object): 'asn', 'isoldies', 'completion_queue', 'lock', 'results', 'completed', 'evaluated', 'last_latency_ms', 'exit_ip', 'reveals_headers', 'last_fail_category', 'original_failcount', 'had_ssl_test', 'ssl_success', - 'cert_error', 'source_proto', 'protos_working' + 'cert_error', 'source_proto', 'protos_working', + 'last_check', 'last_target' ) def __init__(self, ip, port, proto, failcount, success_count, total_duration, @@ -343,6 +344,9 @@ class ProxyTestState(object): # Protocol fingerprinting self.source_proto = source_proto self.protos_working = None + # Test provenance + self.last_check = None + self.last_target = None def record_result(self, success, proto=None, duration=0, srv=None, tor=None, ssl=None, category=None, exit_ip=None, reveals_headers=None): """Record a single target test result. Thread-safe. @@ -480,9 +484,9 @@ class ProxyTestState(object): # and only if this test didn't detect MITM if self.consecutive_success > 0 and (self.consecutive_success % 3) == 0 and not self.cert_error: self.mitm = 0 - self.consecutive_success += 1 - self.success_count += 1 - self.total_duration += int(last_good['duration'] * 1000) + self.consecutive_success = (self.consecutive_success or 0) + 1 + self.success_count = (self.success_count or 0) + 1 + self.total_duration = (self.total_duration or 0) + int(last_good['duration'] * 1000) # Calculate average latency from successful tests (in ms) durations = [s['duration'] for s in successes if s['duration']] @@ -543,6 +547,9 @@ class TargetTestJob(object): _log('JOB RUN #%d: %s -> %s (%s)' % (_sample_debug_counter, self.proxy_state.proxy, self.target_srv, self.checktype), 'info') network_stats.set_category('proxy') + # Track test provenance (overwritten on each attempt, last success wins) + self.proxy_state.last_check = self.checktype + self.proxy_state.last_target = self.target_srv _dbg('test start: %s via %s' % (self.target_srv, self.checktype), self.proxy_state.proxy) sock, proto, duration, tor, srv, failinc, is_ssl, err_cat = self._connect_and_test() _dbg('connect result: sock=%s proto=%s err=%s' % (bool(sock), proto, err_cat), self.proxy_state.proxy) @@ -682,6 +689,90 @@ class TargetTestJob(object): protos.append(p) return protos + def _fingerprint_protocol(self, pool): + """Identify proxy protocol via lightweight handshake probes. + + Sends protocol-specific greeting bytes directly to the proxy + and identifies the protocol from the response pattern. + + Returns: 'socks5', 'socks4', 'http', or None + """ + ps = self.proxy_state + fp_timeout = min(config.watchd.timeout, 5) + torhost = pool.get_tor_host(self.worker_id) if pool else random.choice(config.torhosts) + + for probe_fn, name in ( + (self._probe_socks5, 'socks5'), + (self._probe_socks4, 'socks4'), + (self._probe_http, 'http'), + ): + result = probe_fn(ps, torhost, fp_timeout) + if result: + _sample_dbg('fingerprint: %s detected' % result, ps.proxy) + return result + return None + + def _probe_socks5(self, ps, torhost, timeout): + """Probe for SOCKS5 protocol. Returns 'socks5' or None.""" + try: + sock = rocksock.Rocksock( + host=ps.ip, port=int(ps.port), + proxies=[rocksock.RocksockProxyFromURL(tor_proxy_url(torhost))], + timeout=timeout + ) + sock.connect() + sock.send('\x05\x01\x00') + res = sock.recv(2) + sock.disconnect() + if len(res) >= 1 and res[0] == '\x05': + return 'socks5' + except rocksock.RocksockException: + pass + except KeyboardInterrupt: + raise + return None + + def _probe_socks4(self, ps, torhost, timeout): + """Probe for SOCKS4 protocol. Returns 'socks4' or None.""" + try: + sock = rocksock.Rocksock( + host=ps.ip, port=int(ps.port), + proxies=[rocksock.RocksockProxyFromURL(tor_proxy_url(torhost))], + timeout=timeout + ) + sock.connect() + # CONNECT 1.1.1.1:80 + sock.send('\x04\x01\x00\x50\x01\x01\x01\x01\x00') + res = sock.recv(2) + sock.disconnect() + if len(res) >= 2 and ord(res[0]) == 0 and ord(res[1]) in (0x5a, 0x5b, 0x5c, 0x5d): + return 'socks4' + except rocksock.RocksockException: + pass + except KeyboardInterrupt: + raise + return None + + def _probe_http(self, ps, torhost, timeout): + """Probe for HTTP CONNECT protocol. Returns 'http' or None.""" + try: + sock = rocksock.Rocksock( + host=ps.ip, port=int(ps.port), + proxies=[rocksock.RocksockProxyFromURL(tor_proxy_url(torhost))], + timeout=timeout + ) + sock.connect() + sock.send('CONNECT 1.1.1.1:80 HTTP/1.1\r\nHost: 1.1.1.1:80\r\n\r\n') + res = sock.recv(13) + sock.disconnect() + if res.startswith('HTTP/'): + return 'http' + except rocksock.RocksockException: + pass + except KeyboardInterrupt: + raise + return None + def _connect_and_test(self): """Connect to target through the proxy and send test packet. @@ -702,6 +793,12 @@ class TargetTestJob(object): protos = self._build_proto_order() pool = connection_pool.get_pool() + # Fingerprint unknown proxies to avoid brute-force protocol guessing + if ps.proto is None and config.watchd.fingerprint: + detected = self._fingerprint_protocol(pool) + if detected: + protos = [detected] + [p for p in protos if p != detected] + # Phase 1: SSL handshake (if ssl_first enabled or SSL-only mode) if config.watchd.ssl_first or self.checktype == 'none': result = self._try_ssl_handshake(protos, pool) @@ -1519,7 +1616,8 @@ class Proxywatchd(): dead_count += 1 args.append((effective_failcount, job.checktime, 1, job.country, job.proto, job.success_count, job.total_duration, job.mitm, - job.consecutive_success, job.asn, job.protos_working, job.proxy)) + job.consecutive_success, job.asn, job.protos_working, + job.last_check, job.last_target, job.proxy)) success_rate = (float(sc) / len(self.collected)) * 100 ret = True @@ -1533,7 +1631,8 @@ class Proxywatchd(): if job.failcount == 0: args.append((job.failcount, job.checktime, 1, job.country, job.proto, job.success_count, job.total_duration, job.mitm, - job.consecutive_success, job.asn, job.protos_working, job.proxy)) + job.consecutive_success, job.asn, job.protos_working, + job.last_check, job.last_target, job.proxy)) if job.last_latency_ms is not None: latency_updates.append((job.proxy, job.last_latency_ms)) ret = False @@ -1550,7 +1649,7 @@ class Proxywatchd(): if job.failcount == 0 and job.exit_ip] with self._db_context() as db: - query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=?,asn=?,protos_working=? WHERE proxy=?' + query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=?,asn=?,protos_working=?,last_check=?,last_target=? WHERE proxy=?' db.executemany(query, args) # Batch update latency metrics for successful proxies