#!/usr/bin/env python2 from __future__ import division # Gevent monkey-patching MUST happen before any other imports # This patches threading, socket, time, etc. for cooperative concurrency from gevent import monkey monkey.patch_all() import threading # Now patched by gevent import time import random import string import re import signal import network_stats import os import ssl import contextlib try: import Queue except ImportError: import queue as Queue try: import IP2Location geodb = IP2Location.IP2Location(os.path.join("data", "IP2LOCATION-LITE-DB1.BIN")) geolite = True except (ImportError, IOError, ValueError): geolite = False try: import pyasn asndb = pyasn.pyasn(os.path.join("data", "ipasn.dat")) asn_enabled = True except (ImportError, IOError, NameError): asndb = None asn_enabled = False from config import Config import mysqlite import dbs import dns from misc import _log, categorize_error, tor_proxy_url, is_ssl_protocol_error import rocksock import connection_pool from stats import TargetStats, JudgeStats, Stats, regexes, ssl_targets, try_div from mitm import MITMCertStats, extract_cert_info, get_mitm_certificate from dns import socks4_resolve from job import PriorityJobQueue, calculate_priority # Optional scraper integration for engine stats try: import scraper as scraper_module scraper_available = True except ImportError: scraper_module = None scraper_available = False config = Config() def set_config(cfg): """Set the config object (used when imported from ppf.py).""" global config config = cfg dns.set_config(cfg) _run_standalone = False # Debug mode for proxy check path - set via PPF_DEBUG env or config _debug_proxy = os.environ.get('PPF_DEBUG', '').lower() in ('1', 'true', 'proxy') # Sampling debug: print detailed diagnostics for every Nth test _sample_debug_interval = 50 # Print debug for every 50th test (lowered for diagnosis) _sample_debug_counter = 0 _sample_debug_lock = threading.Lock() def _dbg(msg, proxy=None): """Debug log for proxy check path. Only logs when PPF_DEBUG=1.""" if _debug_proxy: prefix = '[%s] ' % proxy if proxy else '' # Use 'dbg' category (shows at info level) instead of 'debug' (filtered by default) _log('%s%s' % (prefix, msg), 'dbg') def _sample_dbg(msg, proxy=None, force=False): """Sampled debug: log every Nth test for diagnostics without flooding.""" global _sample_debug_counter should_log = force if not should_log: with _sample_debug_lock: _sample_debug_counter += 1 if _sample_debug_counter >= _sample_debug_interval: _sample_debug_counter = 0 should_log = True if should_log: prefix = '[SAMPLE %s] ' % proxy if proxy else '[SAMPLE] ' _log('%s%s' % (prefix, msg), 'diag') def _build_due_sql(): """Build SQL WHERE clause for proxies due for testing. Returns (sql_condition, params) using new schedule formula: - failed=0: tested + working_checktime < now - failed>0 with backoff: tested + (failed * fail_retry_interval) < now - failed>0 without backoff: tested + fail_retry_interval < now """ now = time.time() if config.watchd.fail_retry_backoff: # Linear backoff: multiply interval by failure count sql = '''failed >= 0 AND failed < ? AND ( tested IS NULL OR CASE WHEN failed = 0 THEN tested + ? < ? ELSE tested + (failed * ?) < ? END )''' params = (config.watchd.max_fail, config.watchd.working_checktime, now, config.watchd.fail_retry_interval, now) else: # Fixed interval: same delay regardless of failure count sql = '''failed >= 0 AND failed < ? AND ( tested IS NULL OR CASE WHEN failed = 0 THEN tested + ? < ? ELSE tested + ? < ? END )''' params = (config.watchd.max_fail, config.watchd.working_checktime, now, config.watchd.fail_retry_interval, now) return sql, params # IP pattern for judge services (validates response contains valid IP in body) IP_PATTERN = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}' def is_valid_ip(ip_str): """Validate IP address octets are 0-255.""" try: parts = ip_str.split('.') return len(parts) == 4 and all(0 <= int(p) <= 255 for p in parts) except (ValueError, AttributeError): return False def is_public_ip(ip_str): """Validate IP is a public, globally routable address.""" if not is_valid_ip(ip_str): return False parts = [int(p) for p in ip_str.split('.')] if parts[0] == 0: return False # 0.0.0.0/8 if parts[0] == 10: return False # 10.0.0.0/8 if parts[0] == 127: return False # 127.0.0.0/8 if parts[0] == 169 and parts[1] == 254: return False # link-local if parts[0] == 172 and 16 <= parts[1] <= 31: return False # 172.16/12 if parts[0] == 192 and parts[1] == 168: return False # 192.168/16 if parts[0] >= 224: return False # multicast + reserved return True # Pattern for header echo - if X-Forwarded-For or Via present, proxy reveals chain HEADER_REVEAL_PATTERN = r'(X-Forwarded-For|Via|X-Real-Ip|Forwarded)' # Dead proxy marker - proxies with failed=-1 are permanently dead and never retested DEAD_PROXY = -1 # Error categories that indicate proxy is definitely dead (not temporary failure) FATAL_ERROR_CATEGORIES = ('refused', 'unreachable', 'auth') # Patterns indicating HTTP target is blocking the proxy (not a proxy failure) # These should NOT count as proxy failures - applies to judges and head targets HTTP_BLOCK_PATTERNS = [ r'HTTP/1\.[01] 403', # Forbidden r'HTTP/1\.[01] 429', # Too Many Requests r'HTTP/1\.[01] 503', # Service Unavailable r'captcha', # Captcha challenge r'challenge', # Generic challenge r'verify you are human', # Human verification r'rate.?limit', # Rate limiting r'too many requests', # Rate limiting r'access.?denied', # Access denied r'blocked', # Explicit block r'Checking your browser', # Cloudflare JS challenge ] HTTP_BLOCK_RE = re.compile('|'.join(HTTP_BLOCK_PATTERNS), re.IGNORECASE) # Check types: irc, http (header match), judges (body match), ssl (TLS handshake) # Judge services - return IP in body (plain text, JSON, or HTML) judges = { # Plain text IP 'api.ipify.org': IP_PATTERN, 'icanhazip.com': IP_PATTERN, 'checkip.amazonaws.com': IP_PATTERN, 'ifconfig.me/ip': IP_PATTERN, 'ipinfo.io/ip': IP_PATTERN, 'ip.me': IP_PATTERN, 'myip.dnsomatic.com': IP_PATTERN, 'ident.me': IP_PATTERN, 'ipecho.net/plain': IP_PATTERN, 'wtfismyip.com/text': IP_PATTERN, 'eth0.me': IP_PATTERN, 'l2.io/ip': IP_PATTERN, 'tnx.nl/ip': IP_PATTERN, 'wgetip.com': IP_PATTERN, 'curlmyip.net': IP_PATTERN, # JSON responses (IP pattern matches within JSON) 'httpbin.org/ip': IP_PATTERN, 'ip-api.com/json': IP_PATTERN, 'ipapi.co/json': IP_PATTERN, 'ipwhois.app/json': IP_PATTERN, # Header echo - for elite detection (check if proxy adds revealing headers) 'httpbin.org/headers': IP_PATTERN, # returns request headers as JSON } # Global instances judge_stats = JudgeStats() head_target_stats = TargetStats(cooldown_seconds=300, block_threshold=3) ssl_target_stats = TargetStats(cooldown_seconds=300, block_threshold=3) irc_target_stats = TargetStats(cooldown_seconds=300, block_threshold=3) mitm_cert_stats = MITMCertStats() class ThreadScaler(object): """Dynamic thread scaling based on queue depth and success rate. With gevent greenlets, we can scale much more aggressively than with OS threads since greenlets are cooperative and lightweight. Scales up when: - Queue depth exceeds high watermark - Success rate is above threshold Scales down when: - Queue is nearly empty - Success rate drops below threshold """ def __init__(self, min_threads=5, max_threads=100, target_queue_per_thread=5, scale_cooldown=10, scale_threshold=10.0): self.min_threads = min_threads self.max_threads = max_threads self.target_queue_per_thread = target_queue_per_thread self.last_scale_time = 0 self.scale_cooldown = scale_cooldown self.success_threshold = scale_threshold def should_scale(self, current_threads, queue_size, stats): """Determine if scaling is needed. Args: current_threads: Current number of active threads queue_size: Current job queue depth stats: Stats object with success/fail counts Returns: int: Target thread count (may be same as current) """ now = time.time() if (now - self.last_scale_time) < self.scale_cooldown: return current_threads # Calculate current success rate with stats.lock: total = stats.tested passed = stats.passed success_rate = try_div(passed * 100.0, total) if total > 0 else 50.0 # Calculate ideal thread count based on queue depth # Never spawn more threads than jobs available ideal = max(self.min_threads, queue_size // self.target_queue_per_thread) ideal = min(ideal, self.max_threads, max(self.min_threads, queue_size)) target = current_threads # Scale up: queue is deep - scale based on queue depth only # With greenlets, scale more aggressively (+5 instead of +2) if queue_size > current_threads * self.target_queue_per_thread: target = min(current_threads + 5, ideal, self.max_threads) # Scale down: queue is shallow elif queue_size < current_threads * 2: # Scale down faster when threads far exceed ideal reduction = 2 if current_threads <= ideal * 2 else 5 target = max(current_threads - reduction, ideal, self.min_threads) if target != current_threads: self.last_scale_time = now return target def status_line(self, current_threads, queue_size): """Return status string for logging.""" return 'threads=%d queue=%d target_per_thread=%d' % ( current_threads, queue_size, self.target_queue_per_thread) class ProxyTestState(object): """Thread-safe state for a proxy being tested. Holds test results and evaluates final pass/fail status. """ __slots__ = ( 'ip', 'port', 'proxy', 'auth', 'proto', 'failcount', 'checktime', 'success_count', 'total_duration', 'country', 'mitm', 'consecutive_success', 'asn', 'isoldies', 'completion_queue', 'lock', 'results', 'completed', 'evaluated', 'last_latency_ms', 'exit_ip', 'reveals_headers', 'last_fail_category', 'original_failcount', 'had_ssl_test', 'ssl_success', 'cert_error', 'source_proto', 'protos_working', 'last_check', 'last_target' ) def __init__(self, ip, port, proto, failcount, success_count, total_duration, country, mitm, consecutive_success, asn=None, oldies=False, completion_queue=None, proxy_full=None, source_proto=None): self.ip = ip self.port = int(port) self.proxy = '%s:%s' % (ip, port) # Parse auth credentials from full proxy string (user:pass@ip:port) self.auth = None if proxy_full and '@' in str(proxy_full): auth_part = str(proxy_full).rsplit('@', 1)[0] if ':' in auth_part: self.auth = auth_part # user:pass self.proto = proto self.failcount = failcount self.checktime = None self.success_count = success_count self.total_duration = total_duration self.country = country self.mitm = mitm self.consecutive_success = consecutive_success self.asn = asn self.isoldies = oldies self.completion_queue = completion_queue # for signaling completion # thread-safe result accumulation self.lock = threading.Lock() self.results = [] # list of (success, proto, duration, srv, tor, ssl) self.completed = False self.evaluated = False # for evaluate() idempotency self.last_latency_ms = None # average latency from successful tests self.exit_ip = None # IP seen by target server (for anonymity detection) self.reveals_headers = None # True if proxy adds X-Forwarded-For/Via headers self.last_fail_category = None # failure category from last test (for dead detection) self.original_failcount = failcount # failcount before this test cycle # SSL/TLS tracking self.had_ssl_test = False self.ssl_success = False self.cert_error = False # Protocol fingerprinting self.source_proto = source_proto self.protos_working = None # Test provenance self.last_check = None self.last_target = None def record_result(self, success, proto=None, duration=0, srv=None, tor=None, ssl=None, category=None, exit_ip=None, reveals_headers=None): """Record a single target test result. Thread-safe. When all target tests complete, signals via completion_queue. """ _dbg('record: success=%s proto=%s srv=%s cat=%s' % (success, proto, srv, category), self.proxy) should_signal = False with self.lock: self.results.append({ 'success': success, 'proto': proto, 'duration': duration, 'srv': srv, 'tor': tor, 'ssl': ssl, 'category': category, 'exit_ip': exit_ip, 'reveals_headers': reveals_headers }) # Track SSL tests if ssl: self.had_ssl_test = True if success: self.ssl_success = True # Track cert errors if category in ('cert_error', 'ssl_error', 'ssl_mitm'): self.cert_error = True # Check completion (single-target mode) if not self.completed and len(self.results) >= 1: self.completed = True should_signal = True # Signal outside lock to avoid deadlock if should_signal and self.completion_queue is not None: self.completion_queue.put(self) def is_complete(self): """Check if all target tests have finished.""" # Fast path: check completed flag without lock (atomic read) if self.completed: return True # Slow path: check with lock (only during transition) with self.lock: return len(self.results) >= 1 @staticmethod def rwip(ip): """Remove leading zeros from IP octets (e.g., 192.168.001.010 -> 192.168.1.10).""" return '.'.join(str(int(b)) for b in ip.split('.')) def evaluate(self): """Evaluate results after all tests complete. Returns: (success, category) tuple where success is bool and category is the dominant failure type (or None on success) """ with self.lock: if self.evaluated: # Already evaluated - return cached result return (self.failcount == 0, self.last_fail_category) self.evaluated = True self.checktime = int(time.time()) # Filter out target_block results (inconclusive, neither pass nor fail) block_cats = ('judge_block', 'target_block') real_results = [r for r in self.results if r.get('category') not in block_cats] successes = [r for r in real_results if r['success']] failures = [r for r in real_results if not r['success']] num_success = len(successes) target_blocks = len(self.results) - len(real_results) _dbg('evaluate: %d success, %d fail, %d target_block, results=%d' % ( num_success, len(failures), target_blocks, len(self.results)), self.proxy) # All results were target blocks: inconclusive, preserve current state if not real_results and self.results: _dbg('all results inconclusive (target_block), no state change', self.proxy) self.failcount = self.original_failcount return (self.original_failcount == 0, None) # Determine dominant failure category fail_category = None if failures: cats = {} for f in failures: cat = f.get('category') or 'other' cats[cat] = cats.get(cat, 0) + 1 if cats: fail_category = max(cats.keys(), key=lambda k: cats[k]) # require success (single target mode) if num_success >= 1: # use last successful result for metrics last_good = successes[-1] # Extract exit IP and header info from successful judge responses for s in successes: if s.get('exit_ip'): self.exit_ip = s['exit_ip'] if s.get('reveals_headers') is not None: self.reveals_headers = s['reveals_headers'] if geolite and self.country is None: self.ip = self.rwip(self.ip) rec = geodb.get_all(self.ip) if rec is not None and rec.country_short: self.country = rec.country_short if asn_enabled and self.asn is None: try: asn_result = asndb.lookup(self.ip) if asn_result and asn_result[0]: self.asn = asn_result[0] except Exception as e: if config.watchd.debug: _log('ASN lookup failed for %s: %s' % (self.ip, e), 'debug') # Collect all distinct working protocols working_protos = set() for s in successes: if s.get('proto'): working_protos.add(s['proto']) if working_protos: self.protos_working = ','.join(sorted(working_protos)) # Pick most specific protocol: socks5 > socks4 > http for best in ('socks5', 'socks4', 'http'): if best in working_protos: self.proto = best break else: self.proto = last_good['proto'] else: self.proto = last_good['proto'] self.failcount = 0 # Only reset mitm after 3 consecutive clean successes (not on first success) # and only if this test didn't detect MITM if self.consecutive_success > 0 and (self.consecutive_success % 3) == 0 and not self.cert_error: self.mitm = 0 self.consecutive_success = (self.consecutive_success or 0) + 1 self.success_count = (self.success_count or 0) + 1 self.total_duration = (self.total_duration or 0) + int(last_good['duration'] * 1000) # Calculate average latency from successful tests (in ms) durations = [s['duration'] for s in successes if s['duration']] if durations: self.last_latency_ms = sum(durations) * 1000.0 / len(durations) torstats = "" if len(config.torhosts) == 1 else ' tor: %s;' % last_good['tor'] # Determine anonymity for status message # transparent: exit_ip == proxy_ip # anonymous: exit_ip != proxy_ip, adds revealing headers # elite: exit_ip != proxy_ip, no revealing headers anon_status = '' if self.exit_ip: if self.exit_ip == self.ip: anon_status = ' anon: transparent;' elif self.reveals_headers is False: anon_status = ' anon: elite;' elif self.reveals_headers is True: anon_status = ' anon: anonymous;' else: anon_status = ' anon: anonymous;' # default if no header check _log('%s://%s:%d (%s) d: %.2f sec(s);%s%s srv: %s; ssl: %s' % ( last_good['proto'], self.ip, self.port, self.country, last_good['duration'], torstats, anon_status, last_good['srv'], str(last_good['ssl'])), 'info') _dbg('PASS: failcount=0', self.proxy) return (True, None) else: # Real proxy failure self.failcount += 1 self.consecutive_success = 0 self.last_fail_category = fail_category _dbg('FAIL: failcount=%d cat=%s' % (self.failcount, fail_category), self.proxy) return (False, fail_category) class TargetTestJob(object): """Job to test a single proxy against a single target. Multiple TargetTestJob instances share the same ProxyTestState, allowing tests to be interleaved with other proxies in the queue. """ __slots__ = ('proxy_state', 'target_srv', 'checktype', 'worker_id') def __init__(self, proxy_state, target_srv, checktype, worker_id=None): self.proxy_state = proxy_state self.target_srv = target_srv self.checktype = checktype self.worker_id = worker_id def run(self): """Test the proxy against this job's target server.""" # DIAGNOSTIC: Verify run() is being called global _sample_debug_counter with _sample_debug_lock: _sample_debug_counter += 1 if _sample_debug_counter <= 3 or _sample_debug_counter % 50 == 0: _log('JOB RUN #%d: %s -> %s (%s)' % (_sample_debug_counter, self.proxy_state.proxy, self.target_srv, self.checktype), 'info') network_stats.set_category('proxy') # Track test provenance (overwritten on each attempt, last success wins) self.proxy_state.last_check = self.checktype self.proxy_state.last_target = self.target_srv _dbg('test start: %s via %s' % (self.target_srv, self.checktype), self.proxy_state.proxy) sock, proto, duration, tor, srv, failinc, is_ssl, err_cat = self._connect_and_test() _dbg('connect result: sock=%s proto=%s err=%s' % (bool(sock), proto, err_cat), self.proxy_state.proxy) if not sock: # SSL-only check passed (handshake success, no request needed) if err_cat == 'ssl_ok': elapsed = time.time() - duration self.proxy_state.record_result( True, proto=proto, duration=elapsed, srv=srv, tor=tor, ssl=is_ssl ) elif err_cat == 'ssl_mitm': # MITM detected - proxy works but intercepts TLS elapsed = time.time() - duration self.proxy_state.record_result( True, proto=proto, duration=elapsed, srv=srv, tor=tor, ssl=is_ssl ) self.proxy_state.mitm = 1 else: self.proxy_state.record_result(False, category=err_cat, ssl=is_ssl) return try: recv = sock.recv(-1) _sample_dbg('RECV: %d bytes from %s, first 80: %r' % (len(recv), srv, recv[:80]), self.proxy_state.proxy) # Validate HTTP response for non-IRC checks if self.checktype != 'irc' and not recv.startswith('HTTP/'): _dbg('not an HTTP response, failing (first 40: %r)' % recv[:40], self.proxy_state.proxy) self.proxy_state.record_result(False, category='bad_response') return # Select regex based on check type (or fallback target) if 'check.torproject.org' in srv: # Tor API fallback (judge using torproject.org) regex = r'"IsTor"\s*:\s*true' elif self.checktype == 'irc': regex = '^(:|NOTICE|ERROR)' elif self.checktype == 'judges': regex = judges[srv] elif self.checktype == 'ssl': # Should not reach here - ssl returns before recv self.proxy_state.record_result(True, proto=proto, srv=srv, ssl=is_ssl) return else: # http regex = regexes[srv] _dbg('recv %d bytes, regex=%s' % (len(recv), regex[:30]), self.proxy_state.proxy) if re.search(regex, recv, re.IGNORECASE): elapsed = time.time() - duration _dbg('regex MATCH, elapsed=%.2fs' % elapsed, self.proxy_state.proxy) # Extract exit IP from judge/tor response exit_ip = None reveals_headers = None if self.checktype == 'judges' or 'check.torproject.org' in srv: ip_match = re.search(IP_PATTERN, recv) if ip_match and is_public_ip(ip_match.group(0)): exit_ip = ip_match.group(0) if self.checktype == 'judges' and 'check.torproject.org' not in srv: # Check for header echo judge (elite detection) if 'headers' in srv: # If X-Forwarded-For/Via/etc present, proxy reveals chain reveals_headers = bool(re.search(HEADER_REVEAL_PATTERN, recv, re.IGNORECASE)) # Record successful judge judge_stats.record_success(srv) elif self.checktype == 'head': head_target_stats.record_success(srv) elif self.checktype == 'irc': irc_target_stats.record_success(srv) self.proxy_state.record_result( True, proto=proto, duration=elapsed, srv=srv, tor=tor, ssl=is_ssl, exit_ip=exit_ip, reveals_headers=reveals_headers ) else: _dbg('regex NO MATCH, recv[:100]=%r' % recv[:100], self.proxy_state.proxy) # Check if HTTP target is blocking us (not a proxy failure) if self.checktype in ('judges', 'head') and HTTP_BLOCK_RE.search(recv): if self.checktype == 'judges': judge_stats.record_block(srv) else: head_target_stats.record_block(srv) _dbg('target BLOCK detected, skipping (neutral)', self.proxy_state.proxy) self.proxy_state.record_result( False, category='target_block', proto=proto, srv=srv, tor=tor, ssl=is_ssl ) if config.watchd.debug: _log('%s %s challenged proxy %s (neutral, skipped)' % ( self.checktype, srv, self.proxy_state.proxy), 'debug') else: _dbg('FAIL: no match, no block', self.proxy_state.proxy) if self.checktype == 'judges': judge_stats.record_failure(srv) elif self.checktype == 'head': head_target_stats.record_failure(srv) elif self.checktype == 'irc': irc_target_stats.record_failure(srv) self.proxy_state.record_result(False, category='other') except KeyboardInterrupt as e: sock.disconnect() raise e except rocksock.RocksockException as e: self.proxy_state.record_result(False, category=categorize_error(e)) finally: sock.disconnect() def _build_proto_order(self): """Build smart protocol test order based on available intelligence. Priority: 1. Previously successful proto (if set) 2. Source-detected proto (if different, confidence >= 60) 3. Remaining protos in default order: socks5, socks4, http For failing proxies (failcount > 0 and proto known), only retest with the known proto to save resources. """ ps = self.proxy_state default_order = ['socks5', 'socks4', 'http'] # Known proto from previous test: only retest that if ps.proto is not None: # For failing proxies, skip multi-proto discovery if ps.failcount > 0: return [ps.proto] # For working proxies, lead with known proto but try others protos = [ps.proto] # Add source hint if different if ps.source_proto and ps.source_proto != ps.proto: protos.append(ps.source_proto) # Fill remaining for p in default_order: if p not in protos: protos.append(p) return protos # Unknown proto: use source hint if available protos = [] if ps.source_proto: protos.append(ps.source_proto) for p in default_order: if p not in protos: protos.append(p) return protos def _fingerprint_protocol(self, pool): """Identify proxy protocol via lightweight handshake probes. Sends protocol-specific greeting bytes directly to the proxy and identifies the protocol from the response pattern. Returns: 'socks5', 'socks4', 'http', or None """ ps = self.proxy_state fp_timeout = min(config.watchd.timeout, 5) torhost = pool.get_tor_host(self.worker_id) if pool else random.choice(config.torhosts) for probe_fn, name in ( (self._probe_socks5, 'socks5'), (self._probe_socks4, 'socks4'), (self._probe_http, 'http'), ): result = probe_fn(ps, torhost, fp_timeout) if result: _sample_dbg('fingerprint: %s detected' % result, ps.proxy) return result return None def _probe_socks5(self, ps, torhost, timeout): """Probe for SOCKS5 protocol. Returns 'socks5' or None.""" try: sock = rocksock.Rocksock( host=ps.ip, port=int(ps.port), proxies=[rocksock.RocksockProxyFromURL(tor_proxy_url(torhost))], timeout=timeout ) sock.connect() sock.send('\x05\x01\x00') res = sock.recv(2) sock.disconnect() if len(res) >= 1 and res[0] == '\x05': return 'socks5' except rocksock.RocksockException: pass except KeyboardInterrupt: raise return None def _probe_socks4(self, ps, torhost, timeout): """Probe for SOCKS4 protocol. Returns 'socks4' or None.""" try: sock = rocksock.Rocksock( host=ps.ip, port=int(ps.port), proxies=[rocksock.RocksockProxyFromURL(tor_proxy_url(torhost))], timeout=timeout ) sock.connect() # CONNECT 1.1.1.1:80 sock.send('\x04\x01\x00\x50\x01\x01\x01\x01\x00') res = sock.recv(2) sock.disconnect() if len(res) >= 2 and ord(res[0]) == 0 and ord(res[1]) in (0x5a, 0x5b, 0x5c, 0x5d): return 'socks4' except rocksock.RocksockException: pass except KeyboardInterrupt: raise return None def _probe_http(self, ps, torhost, timeout): """Probe for HTTP CONNECT protocol. Returns 'http' or None.""" try: sock = rocksock.Rocksock( host=ps.ip, port=int(ps.port), proxies=[rocksock.RocksockProxyFromURL(tor_proxy_url(torhost))], timeout=timeout ) sock.connect() sock.send('CONNECT 1.1.1.1:80 HTTP/1.1\r\nHost: 1.1.1.1:80\r\n\r\n') res = sock.recv(13) sock.disconnect() if res.startswith('HTTP/'): return 'http' except rocksock.RocksockException: pass except KeyboardInterrupt: raise return None def _connect_and_test(self): """Connect to target through the proxy and send test packet. If ssl_first is enabled: 1. Try SSL handshake first 2. If SSL succeeds -> return success 3. If SSL fails -> try secondary check (configured checktype) """ ps = self.proxy_state _dbg('_connect_and_test: target=%s checktype=%s ssl_first=%s' % ( self.target_srv, self.checktype, config.watchd.ssl_first), ps.proxy) # Always log first test to verify code path global _sample_debug_counter if _sample_debug_counter == 0: _log('FIRST TEST: proxy=%s target=%s check=%s ssl_first=%s' % ( ps.proxy, self.target_srv, self.checktype, config.watchd.ssl_first), 'info') protos = self._build_proto_order() pool = connection_pool.get_pool() # Fingerprint unknown proxies to avoid brute-force protocol guessing if ps.proto is None and config.watchd.fingerprint: detected = self._fingerprint_protocol(pool) if detected: protos = [detected] + [p for p in protos if p != detected] # Phase 1: SSL handshake (if ssl_first enabled or SSL-only mode) ssl_reason = None if config.watchd.ssl_first or self.checktype == 'none': result, ssl_reason = self._try_ssl_handshake(protos, pool) if result is not None: return result # SSL succeeded or MITM detected # SSL failed for all protocols if config.watchd.ssl_only or self.checktype == 'none': _dbg('SSL failed, no secondary check', ps.proxy) return (None, None, 0, None, None, 1, 0, 'ssl_only') _dbg('SSL failed, trying secondary check: %s' % self.checktype, ps.proxy) # Phase 2: Secondary check (configured checktype) return self._try_secondary_check(protos, pool, ssl_reason) def _try_ssl_handshake(self, protos, pool): """Attempt SSL handshake to verify proxy works with TLS. Returns: (result, ssl_reason) where result is a tuple on success/MITM or None on failure, and ssl_reason is the last SSL error reason string (for secondary check SSL/plain decision). """ ps = self.proxy_state available_ssl = ssl_target_stats.get_available(ssl_targets) or ssl_targets ssl_target = random.choice(available_ssl) last_error_category = None last_ssl_reason = None for proto in protos: if pool: torhost = pool.get_tor_host(self.worker_id) else: torhost = random.choice(config.torhosts) network_stats.set_tor_node(torhost) if proto == 'socks4': srv = socks4_resolve(ssl_target, 443) else: srv = ssl_target if not srv: continue duration = time.time() if ps.auth: proxy_url = '%s://%s@%s:%s' % (proto, ps.auth, ps.ip, ps.port) else: proxy_url = '%s://%s:%s' % (proto, ps.ip, ps.port) proxies = [ rocksock.RocksockProxyFromURL(tor_proxy_url(torhost)), rocksock.RocksockProxyFromURL(proxy_url), ] adaptive_timeout = config.watchd.timeout + min( ps.failcount * config.watchd.timeout_fail_inc, config.watchd.timeout_fail_max) try: sock = rocksock.Rocksock(host=srv, port=443, ssl=1, proxies=proxies, timeout=adaptive_timeout, verifycert=True) _dbg('SSL handshake: proto=%s tor=%s target=%s' % (proto, torhost, ssl_target), ps.proxy) sock.connect() # SSL handshake succeeded elapsed = time.time() - duration if pool: pool.record_success(torhost, elapsed) ssl_target_stats.record_success(ssl_target) sock.disconnect() _dbg('SSL handshake OK', ps.proxy) return (None, proto, duration, torhost, ssl_target, 0, 1, 'ssl_ok'), None except rocksock.RocksockException as e: last_error_category = categorize_error(e) et = e.get_errortype() err = e.get_error() # Track SSL reason for secondary check decision if et == rocksock.RS_ET_SSL: reason = e.get_failedproxy() if isinstance(reason, str): last_ssl_reason = reason try: sock.disconnect() except: pass if et == rocksock.RS_ET_SSL and err == rocksock.RS_E_SSL_CERTIFICATE_ERROR: # MITM detected - proxy works but intercepts TLS ps.mitm = 1 elapsed = time.time() - duration if pool: pool.record_success(torhost, elapsed) _dbg('SSL MITM detected', ps.proxy) return (None, proto, duration, torhost, ssl_target, 0, 1, 'ssl_mitm'), None if config.watchd.debug: _log('SSL handshake failed: %s://%s:%d: %s' % ( proto, ps.ip, ps.port, e.get_errormessage()), 'debug') # Check for Tor connection issues if et == rocksock.RS_ET_OWN: fp = e.get_failedproxy() if fp == 0 and err == rocksock.RS_E_TARGET_CONN_REFUSED: if pool: pool.record_failure(torhost) elif fp == 1 and (err == rocksock.RS_E_REMOTE_DISCONNECTED or err == rocksock.RS_E_HIT_TIMEOUT): # Target-side failure ssl_target_stats.record_failure(ssl_target) elif et == rocksock.RS_ET_GAI: # DNS failure -- target unresolvable ssl_target_stats.record_block(ssl_target) except KeyboardInterrupt: raise # All protocols failed SSL return None, last_ssl_reason def _try_secondary_check(self, protos, pool, ssl_reason=None): """Try the configured secondary checktype (head, judges, irc). ssl_reason: last SSL error reason from _try_ssl_handshake, used to decide whether to use SSL or plain HTTP for the secondary check. Protocol errors (proxy doesn't speak TLS) -> plain HTTP. Other errors (cert, timeout, etc.) -> SSL without cert verification. """ ps = self.proxy_state _sample_dbg('TEST START: proxy=%s target=%s check=%s' % ( ps.proxy, self.target_srv, self.checktype), ps.proxy) srvname = self.target_srv.strip() if self.checktype == 'irc' else self.target_srv # For judges, extract host from 'host/path' format if self.checktype == 'judges' and '/' in srvname: connect_host = srvname.split('/')[0] else: connect_host = srvname # Decide SSL based on why the primary handshake failed: # - protocol error (proxy can't TLS) -> plain HTTP # - other error (cert, timeout) -> SSL without cert verification # - no ssl_reason (ssl_first off) -> plain HTTP (no prior info) protocol_error = is_ssl_protocol_error(ssl_reason) if ssl_reason else True verifycert = False if protocol_error: use_ssl = 0 if self.checktype == 'irc': server_port = 6667 else: server_port = 80 _dbg('secondary: plain (ssl protocol error)', ps.proxy) else: use_ssl = 1 if self.checktype == 'irc': server_port = 6697 else: server_port = 443 _dbg('secondary: ssl/no-verify (non-protocol ssl error)', ps.proxy) last_error_category = None for proto in protos: if pool: torhost = pool.get_tor_host(self.worker_id) else: torhost = random.choice(config.torhosts) network_stats.set_tor_node(torhost) if proto == 'socks4': srv = socks4_resolve(connect_host, server_port) else: srv = connect_host if not srv: continue duration = time.time() # Build proxy URL, including auth credentials if present if ps.auth: proxy_url = '%s://%s@%s:%s' % (proto, ps.auth, ps.ip, ps.port) else: proxy_url = '%s://%s:%s' % (proto, ps.ip, ps.port) proxies = [ rocksock.RocksockProxyFromURL(tor_proxy_url(torhost)), rocksock.RocksockProxyFromURL(proxy_url), ] # Adaptive timeout: give proxies with failures more time adaptive_timeout = config.watchd.timeout + min( ps.failcount * config.watchd.timeout_fail_inc, config.watchd.timeout_fail_max) try: sock = rocksock.Rocksock(host=srv, port=server_port, ssl=use_ssl, proxies=proxies, timeout=adaptive_timeout, verifycert=verifycert) _dbg('connecting: proto=%s tor=%s ssl=%d' % (proto, torhost, use_ssl), ps.proxy) _sample_dbg('CONNECT: tor=%s -> proxy=%s:%s (%s) -> %s:%d ssl=%d timeout=%.1f' % ( torhost, ps.ip, ps.port, proto, srv, server_port, use_ssl, adaptive_timeout), ps.proxy) sock.connect() _dbg('connected OK', ps.proxy) _sample_dbg('CONNECTED OK: %s via %s' % (ps.proxy, proto), ps.proxy) if self.checktype == 'irc': sock.send('NICK\n') elif self.checktype == 'judges': # GET request to receive body (IP) sock.send('GET /%s HTTP/1.0\r\nHost: %s\r\n\r\n' % ( srvname.split('/', 1)[1] if '/' in srvname else '', srvname.split('/')[0] )) else: # head - HEAD is sufficient for header checks sock.send('HEAD / HTTP/1.0\r\nHost: %s\r\n\r\n' % srvname) # Record success in pool if pool: pool.record_success(torhost, time.time() - duration) return sock, proto, duration, torhost, srvname, 0, use_ssl, None except rocksock.RocksockException as e: last_error_category = categorize_error(e) if config.watchd.debug: _log("proxy failed: %s://%s:%d: %s [%s]" % (proto, ps.ip, ps.port, e.get_errormessage(), last_error_category), 'debug') et = e.get_errortype() err = e.get_error() fp = e.get_failedproxy() _sample_dbg('ERROR: %s via %s -> %s (et=%d err=%d fp=%d cat=%s)' % ( ps.proxy, proto, e.get_errormessage(), et, err, fp, last_error_category), ps.proxy) sock.disconnect() if et == rocksock.RS_ET_OWN: if fp == 1 and (err == rocksock.RS_E_REMOTE_DISCONNECTED or err == rocksock.RS_E_HIT_TIMEOUT): # Target-side failure -- proxy reached target but it's down if self.checktype == 'head': head_target_stats.record_failure(srvname) elif self.checktype == 'irc': irc_target_stats.record_failure(srvname) break elif fp == 0 and err == rocksock.RS_E_TARGET_CONN_REFUSED: # Tor connection failed - record in pool if pool: pool.record_failure(torhost) if random.randint(0, (config.watchd.threads - 1) / 2) == 0: _log("could not connect to tor, sleep 5s", "ERROR") time.sleep(5) elif et == rocksock.RS_ET_GAI: # DNS failure -- target hostname unresolvable (hard failure) if self.checktype == 'head': head_target_stats.record_block(connect_host) elif self.checktype == 'irc': irc_target_stats.record_block(srvname) _log("could not resolve connection target %s" % connect_host, "ERROR") break elif et == rocksock.RS_ET_SSL and err == rocksock.RS_E_SSL_CERTIFICATE_ERROR: # MITM detected - proxy works but intercepts TLS ps.mitm = 1 elapsed = time.time() - duration if pool: pool.record_success(torhost, elapsed) # Extract MITM certificate info (async to not block) try: cert_info = get_mitm_certificate( ps.ip, ps.port, proto, torhost, connect_host, server_port, config.watchd.timeout, auth=ps.auth ) if cert_info: mitm_cert_stats.add_cert(ps.ip, cert_info) if config.watchd.debug: _log('MITM cert: %s (CN=%s, O=%s)' % ( cert_info.get('fingerprint', ''), cert_info.get('subject_cn', ''), cert_info.get('subject_o', '')), 'debug') except Exception as e: if config.watchd.debug: _log('failed to extract MITM cert: %s' % str(e), 'debug') return None, proto, duration, torhost, srvname, 0, use_ssl, 'ssl_mitm' except KeyboardInterrupt as e: raise e _sample_dbg('ALL PROTOS FAILED: %s last_cat=%s' % (ps.proxy, last_error_category), ps.proxy) return None, None, None, None, None, 1, use_ssl, last_error_category class WorkerThread(): def __init__(self, id, job_queue): self.id = id self.done = threading.Event() self.thread = None self.job_queue = job_queue # shared input queue def stop(self): self.done.set() def term(self): if self.thread: self.thread.join() def start_thread(self): self.thread = threading.Thread(target=self.workloop) self.thread.start() def workloop(self): job_count = 0 duration_total = 0 while not self.done.is_set(): try: job = self.job_queue.get(timeout=0.5) except Queue.Empty: continue nao = time.time() # Assign worker ID for connection pool affinity job.worker_id = self.id try: job.run() except Exception as e: # Ensure state completes on unexpected exceptions (prevents memory leak) _log('job exception: %s' % e, 'error') try: job.proxy_state.record_result(False, category='exception') except Exception: pass # State may already be completed spent = time.time() - nao job_count += 1 duration_total += spent self.job_queue.task_done() if self.thread and job_count > 0: avg_t = try_div(duration_total, job_count) _log("terminated, %d jobs, avg.time %.2f" % (job_count, avg_t), self.id) class VerificationThread(threading.Thread): """Manager thread that verifies disputed/suspicious proxy results. Pulls from verification_queue, tests proxies directly via Tor, records authoritative results, and updates worker trust scores. """ def __init__(self, database, interval=30, batch_size=10): threading.Thread.__init__(self) self.database = database self.interval = interval self.batch_size = batch_size self.daemon = True self._stop = threading.Event() self.verified_count = 0 self.disagreements_resolved = 0 def stop(self): self._stop.set() def run(self): _log('verification thread started (interval=%ds, batch=%d)' % ( self.interval, self.batch_size), 'verify') while not self._stop.is_set(): try: self._verification_cycle() except Exception as e: _log('verification cycle error: %s' % e, 'error') # Wait for next cycle self._stop.wait(self.interval) _log('verification thread stopped (verified=%d, resolved=%d)' % ( self.verified_count, self.disagreements_resolved), 'verify') def _verification_cycle(self): """Process pending verifications.""" db = mysqlite.mysqlite(self.database, str) try: pending = dbs.get_pending_verifications(db, self.batch_size) if not pending: return for item in pending: if self._stop.is_set(): break proxy = item['proxy'] trigger = item['trigger'] # Test proxy directly result = self._test_proxy(proxy) if result is None: # Test failed/inconclusive, skip for now continue # Record verification result self._record_verification(db, item, result) self.verified_count += 1 if trigger == 'disagreement': self.disagreements_resolved += 1 db.commit() finally: db.close() def _test_proxy(self, proxy): """Test proxy directly using local Tor connection. Returns: True if proxy works, False if fails, None if inconclusive """ parts = proxy.split(':') if len(parts) != 2: return None ip, port_str = parts try: port = int(port_str) except ValueError: return None # Try SSL handshake as primary test pool = connection_pool.get_pool() if not pool: return None proto_map = { 'http': rocksock.RS_PT_HTTP, 'socks4': rocksock.RS_PT_SOCKS4, 'socks5': rocksock.RS_PT_SOCKS5 } protos = ['http', 'socks5', 'socks4'] for proto in protos: try: sock = rocksock.Rocksock( host='www.google.com', port=443, ssl=True, timeout=config.common.timeout_connect, proxies=[ (tor_proxy_url(pool.get_host(), proto='socks5'), rocksock.RS_PT_SOCKS5), (proxy, proto_map.get(proto, rocksock.RS_PT_HTTP)) ] ) sock.connect() sock.disconnect() return True except Exception: continue return False def _record_verification(self, db, item, result): """Record verification result and update worker trust.""" proxy = item['proxy'] trigger = item['trigger'] worker_a = item.get('worker_a') worker_b = item.get('worker_b') result_a = item.get('result_a') result_b = item.get('result_b') # Update worker trust based on who was correct if trigger == 'disagreement' and worker_a and worker_b: # Check which worker(s) agreed with verification result_int = 1 if result else 0 if result_a == result_int: dbs.update_worker_trust(db, worker_a, True) else: dbs.update_worker_trust(db, worker_a, False) if result_b == result_int: dbs.update_worker_trust(db, worker_b, True) else: dbs.update_worker_trust(db, worker_b, False) elif worker_a: # Single worker trigger (resurrection/sudden_death) result_int = 1 if result else 0 was_correct = (result_a == result_int) dbs.update_worker_trust(db, worker_a, was_correct) # Update proxy status with authoritative result now_int = int(time.time()) if result: db.execute(''' UPDATE proxylist SET failed = 0, tested = ?, last_seen = ? WHERE proxy = ? ''', (now_int, now_int, proxy)) else: db.execute(''' UPDATE proxylist SET failed = failed + 1, tested = ? WHERE proxy = ? ''', (now_int, proxy)) # Remove from verification queue dbs.remove_from_verification_queue(db, proxy) _log('verified %s: %s (trigger=%s)' % ( proxy, 'PASS' if result else 'FAIL', trigger), 'verify') class Proxywatchd(): def stop(self): _log('halting... (%d thread(s))' % len(self.threads), 'watchd') self.stopping.set() def _cleanup(self): for wt in self.threads: wt.stop() for wt in self.threads: wt.term() self.collect_work() self.submit_collected() if self.httpd_server: self.httpd_server.stop() if self.verification_thread: self.verification_thread.stop() self.stopped.set() def finish(self): if not self.in_background: self._cleanup() while not self.stopped.is_set(): time.sleep(0.1) success_rate = try_div(self.totals['success'], self.totals['submitted']) * 100 _log("total results: %d/%d (%.2f%%)"%(self.totals['success'], self.totals['submitted'], success_rate), "watchd") # Final save of session state before exit try: with self._db_context() as db: dbs.save_session_state(db, self.stats) dbs.save_stats_snapshot(db, self.stats) _log('session state saved', 'watchd') except Exception as e: _log('failed to save final session state: %s' % str(e), 'warn') # Save MITM certificate stats try: mitm_cert_stats.save_state(self.mitm_state_file) _log('MITM cert state saved', 'watchd') except Exception as e: _log('failed to save MITM state: %s' % str(e), 'warn') @contextlib.contextmanager def _db_context(self): """Context manager for database connections.""" db = mysqlite.mysqlite(config.watchd.database, str) try: yield db finally: db.close() def __init__(self): config.load() dns.set_config(config) self.in_background = False self.threads = [] self.stopping = threading.Event() self.stopped = threading.Event() # shared work-stealing queues self.job_queue = PriorityJobQueue() self.completion_queue = Queue.Queue() # completed ProxyTestState objects # track pending proxy states self.pending_states = {} # dict: proxy -> ProxyTestState self.pending_lock = threading.Lock() # create table if needed (use dbs.py for canonical schema) with self._db_context() as db: dbs.create_table_if_not_exists(db, 'proxylist') self.submit_after = config.watchd.submit_after # number of collected jobs before writing db self.collected = [] # completed ProxyTestState objects ready for DB self.totals = { 'submitted':0, 'success':0, } self.stats = Stats() self.last_cleanup = time.time() self.httpd_server = None self.verification_thread = None # Dynamic thread scaling min_t = config.watchd.min_threads if config.watchd.min_threads > 0 else max(5, config.watchd.threads // 4) self.scaler = ThreadScaler( min_threads=min_t, max_threads=config.watchd.threads, target_queue_per_thread=5, scale_cooldown=config.watchd.scale_cooldown, scale_threshold=config.watchd.scale_threshold ) self.thread_id_counter = 0 self.last_jobs_log = 0 self.last_update_log = 0 self.log_interval = 60 # seconds between routine log messages # Register SIGHUP handler for config reload signal.signal(signal.SIGHUP, self._handle_sighup) def _handle_sighup(self, signum, frame): """Handle SIGHUP signal for config reload.""" _log('received SIGHUP, reloading config...', 'watchd') self.reload_config() def reload_config(self): """Reload configuration without restart. Hot-reloadable settings: threads, timeout, checktime, perfail_checktime, submit_after, stats_interval, debug, tor_safeguard, stale_days, max_fail, outage_threshold Requires restart: database, source_file, checktype """ old_threads = config.watchd.threads old_checktypes = list(config.watchd.checktypes) try: config.load() errors = config.validate() if errors: for e in errors: _log('config error: %s' % e, 'error') _log('config reload failed, keeping old config', 'error') return False except Exception as e: _log('config reload failed: %s' % str(e), 'error') return False # Update runtime values self.submit_after = config.watchd.submit_after # Update scaler limits self.scaler.min_threads = max(5, config.watchd.threads // 4) self.scaler.max_threads = config.watchd.threads # Warn about values requiring restart if config.watchd.checktypes != old_checktypes: _log('checktype changed (%s -> %s), requires restart to take effect' % ( ','.join(old_checktypes), ','.join(config.watchd.checktypes)), 'warn') _log('config reloaded: threads=%d timeout=%d working=%ds fail_interval=%ds backoff=%s' % ( config.watchd.threads, config.watchd.timeout, config.watchd.working_checktime, config.watchd.fail_retry_interval, config.watchd.fail_retry_backoff), 'watchd') return True def _spawn_thread(self): """Spawn a new worker thread.""" self.thread_id_counter += 1 threadid = 'dyn%d' % self.thread_id_counter wt = WorkerThread(threadid, self.job_queue) wt.start_thread() self.threads.append(wt) return threadid def _remove_thread(self): """Remove one worker thread (graceful shutdown).""" if len(self.threads) <= self.scaler.min_threads: return None # Stop the last thread wt = self.threads.pop() wt.stop() # Don't wait for it - let it finish its current job return wt.id def _adjust_threads(self, target): """Adjust thread count to target.""" current = len(self.threads) if target > current: added = [] for _ in range(target - current): tid = self._spawn_thread() added.append(tid) if added: _log('scaled up: +%d threads (%s)' % (len(added), ','.join(added)), 'scaler') elif target < current: removed = [] for _ in range(current - target): tid = self._remove_thread() if tid: removed.append(tid) if removed: _log('scaled down: -%d threads' % len(removed), 'scaler') def fetch_rows(self, db): """Fetch proxy rows due for testing from database.""" self.isoldies = False # Build due condition using new schedule formula due_sql, due_params = _build_due_sql() q = '''SELECT ip,port,proto,failed,success_count,total_duration,country,mitm, consecutive_success,asn,proxy,source_proto FROM proxylist WHERE %s ORDER BY RANDOM()''' % due_sql _dbg('fetch_rows: working=%d fail_interval=%d backoff=%s max_fail=%d' % ( config.watchd.working_checktime, config.watchd.fail_retry_interval, config.watchd.fail_retry_backoff, config.watchd.max_fail)) rows = db.execute(q, due_params).fetchall() _dbg('fetch_rows: got %d rows, threads=%d' % (len(rows), config.watchd.threads)) # Check oldies? (dead proxies getting a second chance) if len(rows) < config.watchd.threads: _dbg('fetch_rows: rows < threads, clearing rows (oldies=%s)' % config.watchd.oldies) rows = [] if config.watchd.oldies: self.isoldies = True # Disable tor safeguard for old proxies if self.tor_safeguard: self.tor_safeguard = False # Oldies use simple checktime-based query (dead proxies) now = time.time() oldies_max = config.watchd.max_fail + round(config.watchd.max_fail / 2) q_oldies = '''SELECT ip,port,proto,failed,success_count,total_duration,country, mitm,consecutive_success,asn,proxy,source_proto FROM proxylist WHERE failed >= ? AND failed < ? AND (tested + ?) < ? ORDER BY RANDOM()''' rows = db.execute(q_oldies, (config.watchd.max_fail, oldies_max, config.watchd.oldies_checktime, now)).fetchall() return rows def prepare_jobs(self): ## enable tor safeguard by default self.tor_safeguard = config.watchd.tor_safeguard # Fetch rows within context manager scope with self._db_context() as db: rows = self.fetch_rows(db) _dbg('prepare_jobs: %d rows, checktypes=%s' % (len(rows), config.watchd.checktypes)) checktypes = config.watchd.checktypes # Build target pools for each checktype (filter out targets in cooldown) target_pools = {} for ct in checktypes: if ct == 'none': # SSL-only mode: use ssl_targets as placeholder target_pools[ct] = ssl_targets _dbg('target_pool[none]: SSL-only mode, %d ssl targets' % len(ssl_targets)) elif ct == 'irc': all_servers = config.servers available = irc_target_stats.get_available(all_servers) target_pools[ct] = available if available else all_servers _dbg('target_pool[irc]: %d/%d servers available' % (len(target_pools[ct]), len(all_servers))) elif ct == 'judges': all_judges = list(judges.keys()) available = judge_stats.get_available(all_judges) target_pools[ct] = available if available else all_judges elif ct == 'ssl': available = ssl_target_stats.get_available(ssl_targets) target_pools[ct] = available if available else ssl_targets _dbg('target_pool[ssl]: %d/%d targets available' % (len(target_pools[ct]), len(ssl_targets))) else: # head all_targets = list(regexes.keys()) available = head_target_stats.get_available(all_targets) target_pools[ct] = available if available else all_targets _dbg('target_pool[%s]: %d/%d targets available' % (ct, len(target_pools[ct]), len(all_targets))) # create all jobs first, then shuffle for interleaving all_jobs = [] new_states = [] for row in rows: # create shared state for this proxy # row: ip, port, proto, failed, success_count, total_duration, # country, mitm, consecutive_success, asn, proxy, source_proto state = ProxyTestState( row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], asn=row[9], oldies=self.isoldies, completion_queue=self.completion_queue, proxy_full=row[10], source_proto=row[11] ) new_states.append(state) # randomly select checktype for this proxy checktype = random.choice(checktypes) target_pool = target_pools[checktype] # select single target (single-target mode) target = random.choice(target_pool) # create job for this proxy job = TargetTestJob(state, target, checktype) all_jobs.append(job) # shuffle to interleave tests across different proxies random.shuffle(all_jobs) # track pending states (dict for O(1) lookup/removal) with self.pending_lock: for state in new_states: self.pending_states[state.proxy] = state # queue all jobs with priority for job in all_jobs: priority = calculate_priority( job.proxy_state.failcount, job.proxy_state.success_count, config.watchd.max_fail ) self.job_queue.put(job, priority) proxy_count = len(new_states) job_count = len(all_jobs) _dbg('prepare_jobs: created %d jobs for %d proxies' % (job_count, proxy_count)) now = time.time() if proxy_count > 0 and (now - self.last_jobs_log) >= self.log_interval: _log("created %d jobs for %d proxies" % (job_count, proxy_count), 'watchd') self.last_jobs_log = now return job_count def collect_work(self): # process completed states from completion queue (event-driven, not polling) # ProxyTestState.record_result() pushes to completion_queue when all targets done completed_count = 0 while True: try: state = self.completion_queue.get_nowait() completed_count += 1 # evaluate and record stats success, category = state.evaluate() self.stats.record(success, category, state.proto, state.last_latency_ms, state.country, state.asn, ssl_test=state.had_ssl_test, mitm=(state.mitm > 0), cert_error=state.cert_error) self.collected.append(state) # remove from pending dict (O(1)) with self.pending_lock: self.pending_states.pop(state.proxy, None) except Queue.Empty: break def collect_unfinished(self): # drain any remaining jobs from job queue unfinished_count = 0 while True: try: self.job_queue.get_nowait() unfinished_count += 1 except Queue.Empty: break if unfinished_count > 0: _log("discarded %d unfinished jobs" % unfinished_count, "watchd") # note: corresponding ProxyTestStates will be incomplete # they'll be re-tested in the next cycle def submit_collected(self): if len(self.collected) == 0: return True sc = 0 dead_count = 0 args = [] latency_updates = [] # (proxy, latency_ms) for successful tests max_fail = config.watchd.max_fail _dbg('submit_collected: %d jobs' % len(self.collected)) for job in self.collected: if job.failcount == 0: sc += 1 _dbg('submit OK: failcount=0', job.proxy) if job.last_latency_ms is not None: latency_updates.append((job.proxy, job.last_latency_ms)) # Check if proxy should be marked as permanently dead effective_failcount = job.failcount if job.failcount > 0: is_fatal = job.last_fail_category in FATAL_ERROR_CATEGORIES # Fatal errors (refused/unreachable/auth) = immediately dead if is_fatal: effective_failcount = DEAD_PROXY dead_count += 1 # Non-fatal: mark dead if exceeded max_fail*2 elif job.failcount >= max_fail * 2: effective_failcount = DEAD_PROXY dead_count += 1 args.append((effective_failcount, job.checktime, 1, job.country, job.proto, job.success_count, job.total_duration, job.mitm, job.consecutive_success, job.asn, job.protos_working, job.last_check, job.last_target, effective_failcount, job.proxy)) success_rate = (float(sc) / len(self.collected)) * 100 ret = True if len(self.collected) >= 100 and success_rate <= config.watchd.outage_threshold and self.tor_safeguard: _log("WATCHD %.2f%% SUCCESS RATE - tor circuit blocked? won't submit fails" % success_rate, "ERROR") if sc == 0: return False args = [] latency_updates = [] for job in self.collected: if job.failcount == 0: args.append((job.failcount, job.checktime, 1, job.country, job.proto, job.success_count, job.total_duration, job.mitm, job.consecutive_success, job.asn, job.protos_working, job.last_check, job.last_target, job.failcount, job.proxy)) if job.last_latency_ms is not None: latency_updates.append((job.proxy, job.last_latency_ms)) ret = False now = time.time() if (now - self.last_update_log) >= self.log_interval or success_rate > 0: dead_msg = ', %d marked dead' % dead_count if dead_count > 0 else '' _log("updating %d DB entries (success rate: %.2f%%%s)" % (len(self.collected), success_rate, dead_msg), 'watchd') self.last_update_log = now # Build anonymity updates before DB context anonymity_updates = [(job.proxy, job.exit_ip, job.ip, job.reveals_headers) for job in self.collected if job.failcount == 0 and job.exit_ip] # Separate dead proxies for deletion dead_proxies = [a[-1] for a in args if a[0] == DEAD_PROXY or a[0] >= max_fail] live_args = [a for a in args if a[0] != DEAD_PROXY and a[0] < max_fail] with self._db_context() as db: query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=?,asn=?,protos_working=?,last_check=?,last_target=?,last_seen=CASE WHEN ?=0 THEN strftime("%s","now") ELSE last_seen END WHERE proxy=?' if live_args: db.executemany(query, live_args) # Delete proxies that reached max_fail if dead_proxies: db.executemany('DELETE FROM proxylist WHERE proxy=?', [(p,) for p in dead_proxies]) _log('deleted %d dead proxies' % len(dead_proxies), 'watchd') # Batch update latency metrics for successful proxies if latency_updates: dbs.batch_update_proxy_latency(db, latency_updates) # Batch update anonymity for proxies with exit IP data if anonymity_updates: dbs.batch_update_proxy_anonymity(db, anonymity_updates) db.commit() self.collected = [] self.totals['submitted'] += len(args) self.totals['success'] += sc return ret def cleanup_stale(self): """Remove proxies that have been dead for too long.""" stale_seconds = config.watchd.stale_days * 86400 cutoff = int(time.time()) - stale_seconds with self._db_context() as db: # delete proxies that: (failed >= max_fail OR permanently dead) AND last tested before cutoff result = db.execute( 'DELETE FROM proxylist WHERE (failed >= ? OR failed = ?) AND tested < ?', (config.watchd.max_fail, DEAD_PROXY, cutoff) ) count = result.rowcount if hasattr(result, 'rowcount') else 0 db.commit() if count > 0: _log('removed %d stale proxies (not seen in %d days)' % (count, config.watchd.stale_days), 'watchd') self.last_cleanup = time.time() def start(self): if config.watchd.threads == 1 and _run_standalone: return self._run() else: return self._run_background() def run(self): if self.in_background: while 1: time.sleep(1) def _run_background(self): self.in_background = True t = threading.Thread(target=self._run) t.start() def get_runtime_stats(self): """Return runtime statistics for the dashboard. Returns: dict with tested, passed, success_rate, rate, uptime_seconds, threads, failures, tor_pool, and engine stats. """ stats_data = {} # Stats from Stats object with self.stats.lock: stats_data['tested'] = self.stats.tested stats_data['passed'] = self.stats.passed stats_data['failed'] = self.stats.failed elapsed = time.time() - self.stats.start_time stats_data['uptime_seconds'] = int(elapsed) stats_data['rate'] = try_div(self.stats.tested, elapsed) stats_data['success_rate'] = try_div(self.stats.passed * 100.0, self.stats.tested) stats_data['failures'] = dict(self.stats.fail_categories) # New: Protocol breakdown stats_data['by_proto'] = dict(self.stats.by_proto) # New: Rate history for sparklines stats_data['rate_history'] = list(self.stats.rate_history) stats_data['success_rate_history'] = list(self.stats.success_rate_history) # New: Peak rate stats_data['peak_rate'] = self.stats.peak_rate # New: Recent rate (last 60s) and avg latency stats_data['recent_rate'] = self.stats.get_recent_rate() stats_data['recent_success_rate'] = self.stats.get_recent_success_rate() stats_data['avg_latency'] = self.stats.get_avg_latency() # Latency stats stats_data['min_latency'] = self.stats.min_latency if self.stats.min_latency != float('inf') else 0 stats_data['max_latency'] = self.stats.max_latency stats_data['latency_percentiles'] = self.stats.get_latency_percentiles() stats_data['latency_histogram'] = self.stats.get_latency_histogram() # Protocol performance (tested, passed, rate per protocol) stats_data['proto_stats'] = self.stats.get_proto_stats() # Geographic distribution from session stats_data['top_countries_session'] = self.stats.get_top_countries(10) stats_data['top_asns_session'] = self.stats.get_top_asns(10) # SSL/TLS stats with self.stats.lock: ssl_tested = self.stats.ssl_tested ssl_passed = self.stats.ssl_passed mitm_stats = mitm_cert_stats.get_stats() stats_data['ssl'] = { 'tested': ssl_tested, 'passed': ssl_passed, 'failed': self.stats.ssl_failed, 'fail_categories': dict(self.stats.ssl_fail_categories), 'success_rate': try_div(ssl_passed * 100.0, ssl_tested) if ssl_tested > 0 else 0, 'mitm_detected': self.stats.mitm_detected, 'cert_errors': self.stats.cert_errors, 'mitm_unique_certs': mitm_stats.get('unique_certs', 0), 'mitm_unique_proxies': mitm_stats.get('unique_proxies', 0) } # Thread info stats_data['threads'] = len(self.threads) stats_data['min_threads'] = self.scaler.min_threads stats_data['max_threads'] = self.scaler.max_threads stats_data['queue_size'] = self.job_queue.qsize() stats_data['checktype'] = ','.join(config.watchd.checktypes) stats_data['checktypes'] = config.watchd.checktypes stats_data['profiling'] = ( getattr(config.args, 'profile', False) if hasattr(config, 'args') else False ) or getattr(config.common, 'profiling', False) stats_data['pass_rate'] = try_div(self.stats.passed, elapsed) # Tor pool stats pool = connection_pool.get_pool() if pool: pool_stats = pool.get_stats() hosts_list = [] healthy_count = 0 latency_sum = 0.0 latency_count = 0 for h in pool_stats.get('hosts', []): is_healthy = h.get('available', False) lat_ms = h.get('avg_latency', 0) * 1000 if h.get('avg_latency') else None hosts_list.append({ 'address': h.get('host', ''), 'healthy': is_healthy, 'latency_ms': lat_ms, 'success_rate': h.get('success_rate', 0), }) if is_healthy: healthy_count += 1 if lat_ms and lat_ms > 0: latency_sum += lat_ms latency_count += 1 pool_info = { 'hosts': hosts_list, 'total_requests': pool_stats.get('total_requests', 0), 'success_rate': pool_stats.get('success_rate', 0), 'healthy_count': healthy_count, 'total_count': len(hosts_list), 'avg_latency': latency_sum / latency_count if latency_count > 0 else 0, } stats_data['tor_pool'] = pool_info else: stats_data['tor_pool'] = { 'hosts': [], 'total_requests': 0, 'success_rate': 0, 'healthy_count': 0, 'total_count': 0, 'avg_latency': 0 } # Judge stats (when using judges checktype) if 'judges' in config.watchd.checktypes: js = judge_stats.get_stats() # Remap 'target' -> 'judge' for dashboard compatibility top = [dict(j, judge=j['target']) for j in js.get('top', [])[:5]] stats_data['judges'] = { 'total': js.get('total', 0), 'available': js.get('available', 0), 'in_cooldown': js.get('in_cooldown', 0), 'top_judges': top, } else: stats_data['judges'] = None # Target health stats (all target pools) stats_data['target_health'] = { 'head': head_target_stats.get_stats(), 'ssl': ssl_target_stats.get_stats(), 'irc': irc_target_stats.get_stats(), 'judges': judge_stats.get_stats(), } # Scraper/engine stats if scraper_available: scraper_stats = scraper_module.get_scraper_stats() if scraper_stats: stats_data['engines_available'] = scraper_stats.get('available', 0) stats_data['engines_total'] = scraper_stats.get('total', 0) stats_data['engines_backoff'] = scraper_stats.get('in_backoff', 0) stats_data['scraper'] = scraper_stats else: stats_data['engines_available'] = 0 stats_data['engines_total'] = 0 stats_data['engines_backoff'] = 0 stats_data['scraper'] = None else: stats_data['engines_available'] = 0 stats_data['engines_total'] = 0 stats_data['engines_backoff'] = 0 stats_data['scraper'] = None # Network usage stats stats_data['network'] = network_stats.get_stats() # MITM certificate stats (detailed) stats_data['mitm'] = mitm_cert_stats.get_stats() return stats_data def _run(self): _log('starting...', 'watchd') _log('config: db=%s checktype=%s threads=%d working=%ds fail_interval=%ds backoff=%s max_fail=%d' % ( config.watchd.database, ','.join(config.watchd.checktypes), config.watchd.threads, config.watchd.working_checktime, config.watchd.fail_retry_interval, config.watchd.fail_retry_backoff, config.watchd.max_fail), 'watchd') # Log database status at startup with self._db_context() as db: total = db.execute('SELECT COUNT(*) FROM proxylist').fetchone()[0] due_sql, due_params = _build_due_sql() due = db.execute('SELECT COUNT(*) FROM proxylist WHERE ' + due_sql, due_params).fetchone()[0] # Create stats persistence tables dbs.create_table_if_not_exists(db, 'stats_history') dbs.create_table_if_not_exists(db, 'session_state') # Load persisted session state saved_state = dbs.load_session_state(db) if saved_state: self.stats.load_state(saved_state) # Load MITM certificate state (same directory as database) db_dir = os.path.dirname(config.watchd.database) or '.' self.mitm_state_file = os.path.join(db_dir, 'mitm_certs.json') mitm_cert_stats.load_state(self.mitm_state_file) _log('database: %d total proxies, %d due for testing' % (total, due), 'watchd') # Initialize Tor connection pool connection_pool.init_pool(config.torhosts, warmup=True) # Start HTTP API server if enabled if config.httpd.enabled: from httpd import ProxyAPIServer, configure_schedule, configure_url_scoring # Pass schedule config to httpd module configure_schedule( config.watchd.working_checktime, config.watchd.fail_retry_interval, config.watchd.fail_retry_backoff, config.watchd.max_fail ) configure_url_scoring( config.ppf.checktime, config.ppf.perfail_checktime, config.ppf.max_fail, config.ppf.list_max_age_days ) self.httpd_server = ProxyAPIServer( config.httpd.listenip, config.httpd.port, config.watchd.database, stats_provider=self.get_runtime_stats, url_database=config.ppf.database, ) self.httpd_server.start() # Start verification thread if enabled (manager-only, checks disputed results) if config.verification.enabled and config.watchd.threads > 0: self.verification_thread = VerificationThread( database=config.watchd.database, interval=config.verification.interval, batch_size=config.verification.batch_size ) self.verification_thread.start() _log('verification thread enabled', 'watchd') # create worker threads with shared queues for i in range(config.watchd.threads): threadid = ''.join([random.choice(string.ascii_letters) for x in range(5)]) wt = WorkerThread(threadid, self.job_queue) if self.in_background: wt.start_thread() self.threads.append(wt) time.sleep(random.random() / 10) sleeptime = 0 while True: if self.stopping.is_set(): if self.in_background: self._cleanup() break if sleeptime > 0: time.sleep(1) sleeptime -= 1 continue # Skip job processing when threads=0 (master-only mode) if config.watchd.threads > 0: # check if job queue is empty (work-stealing: threads pull as needed) if self.job_queue.empty(): self.collect_work() if not self.submit_collected() and self.tor_safeguard: _log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd") sleeptime = 60 else: job_count = self.prepare_jobs() if job_count == 0: # no jobs available, wait before checking again sleeptime = 10 if not self.in_background: # single_thread scenario self.threads[0].workloop() self.collect_work() if len(self.collected) > self.submit_after: if not self.submit_collected() and self.tor_safeguard: _log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd") sleeptime = 60 else: # Master-only mode: sleep to avoid busy loop sleeptime = 10 # Update rate history for sparklines self.stats.update_history() # periodic stats report if self.stats.should_report(config.watchd.stats_interval): _log(self.stats.report(), 'stats') # Also report pool stats pool = connection_pool.get_pool() if pool: _log(pool.status_line(), 'stats') # Report judge stats (when using judges checktype) if 'judges' in config.watchd.checktypes: _log(judge_stats.status_line(), 'stats') # Report scaler status _log(self.scaler.status_line(len(self.threads), self.job_queue.qsize()), 'scaler') # Save session state periodically (every stats_interval, default 5m) try: with self._db_context() as db: dbs.save_session_state(db, self.stats) except Exception as e: _log('failed to save session state: %s' % str(e), 'warn') # Save MITM certificate stats periodically try: mitm_cert_stats.save_state(self.mitm_state_file) except Exception as e: _log('failed to save MITM state: %s' % str(e), 'warn') # Hourly stats snapshot now = time.time() if not hasattr(self, '_last_snapshot'): self._last_snapshot = now if (now - self._last_snapshot) >= 3600: try: with self._db_context() as db: dbs.save_stats_snapshot(db, self.stats) self._last_snapshot = now except Exception as e: _log('failed to save stats snapshot: %s' % str(e), 'warn') # Dynamic thread scaling if self.in_background: target = self.scaler.should_scale( len(self.threads), self.job_queue.qsize(), self.stats ) if target != len(self.threads): self._adjust_threads(target) # periodic stale proxy cleanup (daily) if (time.time() - self.last_cleanup) >= 86400: self.cleanup_stale() time.sleep(1) if __name__ == '__main__': _run_standalone = True config.load() errors = config.validate() if errors: for e in errors: _log(e, 'error') import sys sys.exit(1) w = Proxywatchd() try: w.start() w.run() except KeyboardInterrupt as e: pass finally: w.stop() w.finish()